You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			127 lines
		
	
	
		
			3.3 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			127 lines
		
	
	
		
			3.3 KiB
		
	
	
	
		
			Python
		
	
"""
 | 
						|
	This is the implementation of a data collection agent
 | 
						|
	The agent's role is intended to :
 | 
						|
		- collect data associated with folder and processes
 | 
						|
		- The agent will also perform various learning tasks
 | 
						|
 | 
						|
	Usage:
 | 
						|
	python --path <config> --delay xxx --procs p1,p2,p3 --folders path1,path2
 | 
						|
"""
 | 
						|
from threading import Thread, RLock
 | 
						|
from utils.params import PARAMS
 | 
						|
import os
 | 
						|
import json
 | 
						|
import time
 | 
						|
from datetime import datetime
 | 
						|
from utils.transport import *
 | 
						|
import monitor
 | 
						|
class ICollector(Thread) :
 | 
						|
	
 | 
						|
	def __init__(self) :
 | 
						|
		Thread.__init__(self)
 | 
						|
		self.folders 	= None
 | 
						|
		self.procs	= None
 | 
						|
		self.config	= None
 | 
						|
		self.pool	= []
 | 
						|
		self.lock 	= RLock()
 | 
						|
		self.factory	= DataSourceFactory()
 | 
						|
		self.init()
 | 
						|
		self.name = 'data-collector@'+self.id
 | 
						|
		
 | 
						|
		
 | 
						|
	def init(self):
 | 
						|
		
 | 
						|
		
 | 
						|
		#
 | 
						|
		# data store configuration (needs to be in a file)
 | 
						|
		#
 | 
						|
		path = PARAMS['path']
 | 
						|
		if os.path.exists(path) :
 | 
						|
			f = open(path)
 | 
						|
			self.config = json.loads(f.read())
 | 
						|
			#if 'store' in self.config :
 | 
						|
			#	self.config = self.config['store']
 | 
						|
			f.close()
 | 
						|
		self.id = self.config['id'] #PARAMS['id']
 | 
						|
		if 'folders' in self.config : #PARAMS :
 | 
						|
			folders = self.config['folders'] #PARAMS['folders'].split(',')
 | 
						|
			self.register('monitor.FileWatch',folders)
 | 
						|
		if 'procs' in self.config : #PARAMS :
 | 
						|
			procs = self.config['procs'] #PARAMS['procs'].split(',')
 | 
						|
			self.register('monitor.DetailProcess',procs)
 | 
						|
	
 | 
						|
		self.quit = False
 | 
						|
		#self.DELAY = PARAMS['delay']*60
 | 
						|
		self.DELAY = self.config['delay']
 | 
						|
		#
 | 
						|
		# we need to instanciate the actor orchestrator
 | 
						|
		#
 | 
						|
	"""
 | 
						|
		This function returns an instance of a data collector class :
 | 
						|
		ProcessDetails, FileWatch, ... provided the class name
 | 
						|
	"""
 | 
						|
	def register(self,className,params) :
 | 
						|
		try:			
 | 
						|
			
 | 
						|
			agent = eval(className+"()")			
 | 
						|
			agent.init(params)
 | 
						|
			self.pool.append( agent )
 | 
						|
		except Exception,e:
 | 
						|
			print e
 | 
						|
	def stop(self):
 | 
						|
		self.quit = True
 | 
						|
	def run(self):
 | 
						|
		write_class 	= self.config['store']['class']['write']
 | 
						|
		read_args	= self.config['store']['args']
 | 
						|
		DELAY	= self.config['delay'] * 60
 | 
						|
		
 | 
						|
		while self.quit == False:
 | 
						|
			
 | 
						|
			for thread in self.pool :
 | 
						|
				id	= "@".join([thread.getName(),self.id])
 | 
						|
				
 | 
						|
				data	= thread.composite()
 | 
						|
				label	= thread.getName()
 | 
						|
				row	= {}
 | 
						|
				if label == 'folders':
 | 
						|
					row = [ dict({"id":self.id}, **_row) for _row in data]					
 | 
						|
					
 | 
						|
				else:
 | 
						|
					label = id
 | 
						|
					row = data
 | 
						|
				#
 | 
						|
				# At this point we should check for the status and if it prompts an action
 | 
						|
				# @TODO Use a design pattern for this ...
 | 
						|
				#   - submit the row to Event for analysis
 | 
						|
				#   - The event orchestrator will handle things from this point on
 | 
						|
				#
 | 
						|
				message = {}
 | 
						|
				
 | 
						|
				message['to'] = thread.getName()
 | 
						|
				message['content'] = row
 | 
						|
				qwriter = QueueWriter(host=self.config['api'],uid=self.config['key'],qid=self.id)
 | 
						|
				qwriter.write(label=self.id,row = message)
 | 
						|
				qwriter.close()
 | 
						|
				
 | 
						|
				self.lock.acquire()
 | 
						|
				store = self.factory.instance(type=write_class,args=read_args)
 | 
						|
				store.flush(size=200)
 | 
						|
				
 | 
						|
				store.write(label=label,row=row)
 | 
						|
				self.lock.release()
 | 
						|
			if 'MONITOR_CONFIG_PATH' in os.environ :
 | 
						|
				break
 | 
						|
			print '\t *** ',str(datetime.today()),' ** '
 | 
						|
			time.sleep(DELAY)
 | 
						|
			
 | 
						|
		print ' *** Exiting ',self.name
 | 
						|
		# read_class=self.config['class']['read']
 | 
						|
		# store = self.factory.instance(type=write_class,args=read_args)
 | 
						|
		# store.flush()
 | 
						|
		
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
	thread = ICollector()
 | 
						|
	# thread.daemon = True
 | 
						|
	thread.start()
 |