DC - Bug fix with interface

data-collector
Steve L. Nyemba 7 years ago
parent 3ea814349d
commit f12c1467a0

@ -36,7 +36,7 @@ upgrade(){
}
start(){
sandbox/bin/python src/utils/agents/data-collector.py --path $PWD/config.json
sandbox/bin/python src/data-collector.py --path $PWD/config.json
}
stop(){

@ -46,6 +46,7 @@ class Collector(Thread) :
"""
#self.monitor.start()
thread = Thread(target=self.monitor.run)
thread.start()
# print self.monitor.config['store']

@ -1,186 +0,0 @@
"""
This is the implementation of a data collection agent
The agent's role is intended to :
- collect data associated with folder and processes
- The agent will also perform various learning tasks
Usage:
python --path <config> --delay xxx --procs p1,p2,p3 --folders path1,path2
"""
from threading import Thread, RLock
from utils.params import PARAMS
import os
import json
import time
from datetime import datetime
from utils.transport import *
import monitor
class Manager(Thread) :
"""
delay : <value>
limit : <value>
scope : apps,folders,learner,sandbox
"""
def __init__(self):
Thread.__init__(self)
self.lock = RLock()
self.factory = DataSourceFactory()
def init(self,args) :
node,pool,config
self.id = args['node']
self.pool = args['pool']
self.config = args['config']
self.key = args['key']
self.status() #-- Initializing status information
def status(self) :
"""
This method inspect the plans for the current account and makes sure it can/should proceed
The user must be subscribed and to the service otherwise this is not going to work
"""
url="https://the-phi.com/store/status/monitor"
r = requests.post(url,headers={"uid":self.key})
plans = json.loads(r.text)
meta = [item['metadata'] for item in plans if item['status']=='active' ]
if len(meta) > 0 :
self.DELAY = 60* max([ int(item['delay']) for item in meta if ])
self.LIMIT = max([ int(item['limit']) for item in meta if ])
else:
self.DELAY = -1
self.LIMIT = -1
scope = []
[ scope += item['scope'].split(',') for item in meta ]
names = [ for agent in self.pool if agent.getName() in scope]
return meta
def isvalid(self):
self.status()
return self.DELAY > -1 and self.LIMIT > -1
def run(self):
#DELAY=35*60 #- 35 Minutes
#LIMIT=1000
COUNT = 0
COUNT_STOP = int(24*60/ self.DELAY)
print COUNT_STOP
write_class = self.config['store']['class']['write']
read_args = self.config['store']['args']
while True :
COUNT += 1
if COUNT > COUNT_STOP :
if self.isvalid() :
COUNT = 0
else:
break
for agent in self.pool :
data = agent.composite()
label = agent.getName()
node = '@'.join([label,self.id])
row = {}
if label == 'folders':
row = [ dict({"id":self.id}, **_row) for _row in data]
else:
label = id
row = data
self.lock.acquire()
store = self.factory.instance(type=write_class,args=read_args)
store.flush(size=self.LIMIT)
store.write(label=label,row=row)
self.lock.release()
time.sleep(self.DELAY)
class ICollector(Thread) :
def __init__(self) :
Thread.__init__(self)
self.folders = None
self.procs = None
self.config = None
self.pool = []
self.lock = RLock()
self.factory = DataSourceFactory()
self.init()
self.name = 'data-collector@'+self.id
def init(self):
#
# data store configuration (needs to be in a file)
#
path = PARAMS['path']
if os.path.exists(path) :
f = open(path)
self.config = json.loads(f.read())
#if 'store' in self.config :
# self.config = self.config['store']
f.close()
self.id = self.config['id'] #PARAMS['id']
if 'folders' in self.config : #PARAMS :
folders = self.config['folders'] #PARAMS['folders'].split(',')
self.register('monitor.FileWatch',folders)
if 'procs' in self.config : #PARAMS :
procs = self.config['procs'] #PARAMS['procs'].split(',')
self.register('monitor.DetailProcess',procs)
self.quit = False
#self.DELAY = PARAMS['delay']*60
self.DELAY = self.config['delay']
"""
This function returns an instance of a data collector class :
ProcessDetails, FileWatch, ... provided the class name
"""
def register(self,className,params) :
try:
agent = eval(className+"()")
agent.init(params)
self.pool.append( agent )
except Exception,e:
print e
def stop(self):
self.quit = True
def run(self):
write_class = self.config['store']['class']['write']
read_args = self.config['store']['args']
DELAY = self.config['delay'] * 60
while self.quit == False:
for thread in self.pool :
id = "@".join([thread.getName(),self.id])
data = thread.composite()
label = thread.getName()
row = {}
if label == 'folders':
row = [ dict({"id":self.id}, **_row) for _row in data]
else:
label = id
row = data
self.lock.acquire()
store = self.factory.instance(type=write_class,args=read_args)
store.flush(size=200)
store.write(label=label,row=row)
self.lock.release()
if 'MONITOR_CONFIG_PATH' in os.environ :
break
print '\t *** ',str(datetime.today()),' ** '
time.sleep(DELAY)
print ' *** Exiting ',self.name
# read_class=self.config['class']['read']
# store = self.factory.instance(type=write_class,args=read_args)
# store.flush()
if __name__ == '__main__':
thread = ICollector()
# thread.daemon = True
thread.start()

Binary file not shown.
Loading…
Cancel
Save