Bug fix and enhancement with folder monitoring

data-collector
Steve Nyemba 7 years ago
parent 726895d862
commit c3ce3227ff

@ -21,7 +21,7 @@ from threading import Thread, RLock
import monitor import monitor
import utils.agents.actor as actor import utils.agents.actor as actor
from utils.agents.manager import Manager from utils.agents.manager import Manager
SYS_ARGS['host']='localhost' SYS_ARGS['host']='localhost'/
ENDPOINT="http://:host/monitor".replace(":host",SYS_ARGS['host']) ENDPOINT="http://:host/monitor".replace(":host",SYS_ARGS['host'])
class Collector(Thread) : class Collector(Thread) :
def __init__(self): def __init__(self):

@ -29,7 +29,7 @@ class Analysis:
self.now = {"month":d.month,"year":d.year, "day":d.day,"hour":d.hour,"minute":d.minute} self.now = {"month":d.month,"year":d.year, "day":d.day,"hour":d.hour,"minute":d.minute}
def getNow(self): def getNow(self):
d = datetime.datetime.now() d = datetime.datetime.now()
return {"month":d.month,"year":d.year, "day":d.day,"hour":d.hour,"minute":d.minute} return {"month":d.month,"year":d.year, "day":d.day,"hour":d.hour,"minute":d.minute,"second":d.second}
def getName(self): def getName(self):
return self.__class__.__name__ return self.__class__.__name__
def reboot(self,row,conf) : def reboot(self,row,conf) :
@ -167,7 +167,6 @@ class DetailProcess(Analysis):
def init (self,names): def init (self,names):
#Analysis.init(self) #Analysis.init(self)
self.names = names; self.names = names;
def getName(self): def getName(self):
return "apps" return "apps"
@ -218,7 +217,7 @@ class DetailProcess(Analysis):
cmd = "ps -eo user,pid,pmem,pcpu,vsize,stat,command|grep -Ei \":app\"".replace(":app",name) cmd = "ps -eo user,pid,pmem,pcpu,vsize,stat,command|grep -Ei \":app\"".replace(":app",name)
handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE) handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
logs = handler.communicate()[0].split('\n') logs = handler.communicate()[0].split('\n')
logs = [row for row in logs if (row.strip() != '') and ('grep -Ei' in row )== False and (__file__ not in row)] logs = [row for row in logs if (row.strip() != '') and ('grep -Ei' in row )== False ]
if len(logs) == 0: if len(logs) == 0:
return [dict(self.parse('',fields),**{'label':name}) ] return [dict(self.parse('',fields),**{'label':name}) ]
@ -257,111 +256,41 @@ class FileWatch(Analysis):
pass pass
def init(self,folders): def init(self,folders):
self.folders = folders; self.folders = folders;
self.cache = []
def getName(self): def getName(self):
return "folders" return "folders"
def split(self,row):
x = row.split(' ')
r = {}
months = ['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec']
if x:
BYTES_TO_MB = 1000000
size = int(x[0])/BYTES_TO_MB
month = months.index(x[1]) + 1
day = int(x[2])
age = -1
hour=minute = 0
if ':' in x[3] :
hour,minute = x[3].split(':')
now = datetime.datetime.today()
if month == now.month :
year = now.year
else:
year = now.year - 1
else:
year = int(x[3])
hour = 0
minute = 0
file_date = datetime.datetime(year,month,day,int(hour),int(minute)) def evaluate(self,dir_path,r=[]):
# size = round(size,2) """
#file_date = datetime.datetime(year,month,day,hour,minute) This function will recursively scan a folder and retrieve file sizes and age of the files.
now = datetime.datetime.now() The data will be returned as an array of {size,age,label} items
age = (now - file_date ).days """
for child in os.listdir(dir_path):
return {"size":size,"age":age} path = os.path.join(dir_path, child)
return None if os.path.isdir(path):
self.evaluate(path,r)
def evaluate(self,path): else:
cmd = "find :path -print0|xargs -0 ls -ls |awk '{print $6,$7,$8,$9,$10}'".replace(":path",path) size = os.path.getsize(path)
handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE) file_date = os.path.getatime(path)
ostream = handler.communicate()[0].split('\n') file_date = datetime.datetime.fromtimestamp(file_date)
ostream = [row for row in ostream if row.strip() != ''] now = datetime.datetime.now()
#return [self.split(stream) for stream in ostream if stream.strip() != '' and '.DS_Store' not in stream and 'total' not in stream] age = (now - file_date ).days
#return [self.split(stream) for stream in ostream if path not in stream and not set(['','total','.DS_Store']) & set(stream.split(' '))] r.append({"label":path,"size":size,"age":age,"date":self.getNow()})
return [] return r
def toMB(self,size):
m = {'GB':1000,'TB':1000000}
v,u = size.split(' ')
return round(float(v)* m[u.upper()],2)
def reboot(self,rows,limit) : def reboot(self,rows,limit) :
return np.sum([ int(self.toMB(item['size']) > self.toMB(limit)) for item in rows]) > 0 return np.sum([ 1 for item in rows if rows['size'] > limit ]) > 0
def composite(self): def composite(self):
d = [] #-- vector of details (age,size) out = []
for folder in self.folders :
now = datetime.datetime.today()
for folder in self.folders: r = self.evaluate(folder,[])
if os.path.exists(folder): file_count = len(r)
xo_raw = self.evaluate(folder) age = {"mean":np.mean([item['age'] for item in r] ),"var":np.mean([item['age'] for item in r])}
xo = np.array(ML.Extract(['size','age'],xo_raw)) size = {"mean":np.mean([item['size'] for item in r] ),"var":np.mean([item['size'] for item in r])}
if len(xo) == 0: out.append({"label":folder,"stats":{"age":age,"size":size,"file_count":file_count},"logs":r})
continue return out
name = re.findall("([a-z,A-Z,0-9]+)",folder)
name = folder.split(os.sep)
if len(name) == 1:
name = [folder]
else:
i = len(name) -1
name = [name[i-1]+' '+name[i]]
name = name[0]
size = round(np.sum(xo[:,0]),2)
if size > 1000 :
size = round(size/1000,2)
units = ' GB'
elif size > 1000000:
size = round(size/1000000,2)
units = ' TB'
else:
size = size
units = ' MB'
size = str(size)+ units
age = round(np.mean(xo[:,1]),2)
if age > 30 and age <= 365 :
age = round(age/30,2)
units = ' Months'
elif age > 365 :
age = round(age/365,2)
units = ' Years'
else:
age = age
units = ' Days'
age = str(age)+units
N = len(xo[:,1])
xo = {"label":folder} #,"details":xo_raw,"summary":{"size":size,"age":age,"count":len(xo[:,1])}}
xo = dict(xo,**{"size":size,"age":age,"count":N})
xo["name"] = name
xo['day'] = now.day
xo['month'] = now.month
xo['year'] = now.year
xo['date'] = time.mktime(now.timetuple())
d.append(xo)
return d
# class Monitor (Thread): # class Monitor (Thread):

@ -193,8 +193,8 @@ class Folders(Actor):
@pre : isValid @pre : isValid
""" """
folder = item['label'] folder = item['label']
name = folder.split(os.sep) name = folder.split(os.sep)
name = name[len(name)-1] name = name[len(name)-1]
signature='-'.join([name,str(item['date']),str(item['count']),'files']) signature='-'.join([name,str(item['date']),str(item['count']),'files'])
tarball=os.sep.join([folder,'..',signature]) tarball=os.sep.join([folder,'..',signature])
shutil.make_archive(tarball,'tar',folder) shutil.make_archive(tarball,'tar',folder)

@ -4,7 +4,7 @@ from monitor import Env, DetailProcess, ProcessCounter, Sandbox, FileWatch
import monitor import monitor
import os import os
import json import json
from utils.workers import Top, Learner # from utils.workers import Top, Learner
#from multiprocessing import Lock #from multiprocessing import Lock
from threading import Lock from threading import Lock
path = os.environ['MONITOR_CONFIG_PATH'] path = os.environ['MONITOR_CONFIG_PATH']
@ -29,8 +29,8 @@ class TestMonitorServer(unittest.TestCase):
p = DetailProcess() p = DetailProcess()
p.init(['kate','rabbitmq-server','python','apache2','firefox']) p.init(['kate','rabbitmq-server','python','apache2','firefox'])
r = p.composite() r = p.composite()
for row in r: #for row in r:
print row['label'],row['status'],row['proc_count'] # print row['label'],row['status'], sum([1 for item in r if item['label']==row['label']])
self.assertTrue(r) self.assertTrue(r)
def test_ProcessCount(self): def test_ProcessCount(self):
@ -46,22 +46,13 @@ class TestMonitorServer(unittest.TestCase):
p = Sandbox() p = Sandbox()
p.init({"sandbox":sandbox_path,"requirements":requirements_path}) p.init({"sandbox":sandbox_path,"requirements":requirements_path})
p.composite() p.composite()
def test_StartTop(self):
lock = Lock()
p = Top(CONFIG,lock)
#p.start()
#p.join()
def test_StartLearner(self): def test_StartLearner(self):
lock = Lock() pass
p = Learner(CONFIG,lock)
p.start()
def test_FileWatch(self): def test_FileWatch(self):
conf =CONFIG['monitor']['folder']
path =os.environ['FILE_PATH']
fw = FileWatch() fw = FileWatch()
fw.init([path]) fw.init(CONFIG['folders'])
print fw.composite() #r = fw.evaluate('/Users/steve/git/resume')
fw.composite()
if __name__ == '__main__' : if __name__ == '__main__' :
unittest.main() unittest.main()

Loading…
Cancel
Save