You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

128 lines
3.3 KiB
Python

"""
This is the implementation of a data collection agent
The agent's role is intended to :
- collect data associated with folder and processes
- The agent will also perform various learning tasks
Usage:
python --path <config> --delay xxx --procs p1,p2,p3 --folders path1,path2
"""
from threading import Thread, RLock
from utils.params import PARAMS
import os
import json
import time
from datetime import datetime
from utils.transport import *
import monitor
class ICollector(Thread) :
def __init__(self) :
Thread.__init__(self)
self.folders = None
self.procs = None
self.config = None
self.pool = []
self.lock = RLock()
self.factory = DataSourceFactory()
self.init()
self.name = 'data-collector@'+self.id
def init(self):
#
# data store configuration (needs to be in a file)
#
path = PARAMS['path']
if os.path.exists(path) :
f = open(path)
self.config = json.loads(f.read())
#if 'store' in self.config :
# self.config = self.config['store']
f.close()
self.id = self.config['id'] #PARAMS['id']
if 'folders' in self.config : #PARAMS :
folders = self.config['folders'] #PARAMS['folders'].split(',')
self.register('monitor.FileWatch',folders)
if 'procs' in self.config : #PARAMS :
procs = self.config['procs'] #PARAMS['procs'].split(',')
self.register('monitor.DetailProcess',procs)
self.quit = False
#self.DELAY = PARAMS['delay']*60
self.DELAY = self.config['delay']
#
# we need to instanciate the actor orchestrator
#
"""
This function returns an instance of a data collector class :
ProcessDetails, FileWatch, ... provided the class name
"""
def register(self,className,params) :
try:
agent = eval(className+"()")
agent.init(params)
self.pool.append( agent )
except Exception,e:
print e
def stop(self):
self.quit = True
def run(self):
write_class = self.config['store']['class']['write']
read_args = self.config['store']['args']
DELAY = self.config['delay'] * 60
while self.quit == False:
for thread in self.pool :
id = "@".join([thread.getName(),self.id])
data = thread.composite()
label = thread.getName()
row = {}
if label == 'folders':
row = [ dict({"id":self.id}, **_row) for _row in data]
else:
label = id
row = data
#
# At this point we should check for the status and if it prompts an action
# @TODO Use a design pattern for this ... (Aggregation?)
# - submit the row to Event for analysis
# - The event orchestrator will handle things from this point on
#
message = {}
message['to'] = thread.getName()
message['content'] = row
qwriter = QueueWriter(host=self.config['api'],uid=self.config['key'],qid=self.id)
qwriter.write(label=self.id,row = message)
qwriter.close()
self.lock.acquire()
store = self.factory.instance(type=write_class,args=read_args)
store.flush(size=200)
store.write(label=label,row=row)
self.lock.release()
if 'MONITOR_CONFIG_PATH' in os.environ :
break
print '\t *** ',str(datetime.today()),' ** '
time.sleep(DELAY)
print ' *** Exiting ',self.name
# read_class=self.config['class']['read']
# store = self.factory.instance(type=write_class,args=read_args)
# store.flush()
if __name__ == '__main__':
thread = ICollector()
# thread.daemon = True
thread.start()