From f12c1467a035e5093e5fbf1f23b3978db63dbb12 Mon Sep 17 00:00:00 2001 From: "Steve L. Nyemba" Date: Tue, 29 Aug 2017 02:14:19 -0500 Subject: [PATCH] DC - Bug fix with interface --- init.sh | 2 +- src/data-collector.py | 1 + src/utils/agents/data-collector.py | 186 ----------------------------- src/utils/agents/manager.pyc | Bin 3745 -> 0 bytes 4 files changed, 2 insertions(+), 187 deletions(-) delete mode 100644 src/utils/agents/data-collector.py delete mode 100644 src/utils/agents/manager.pyc diff --git a/init.sh b/init.sh index daab9a4..2192d4f 100644 --- a/init.sh +++ b/init.sh @@ -36,7 +36,7 @@ upgrade(){ } start(){ - sandbox/bin/python src/utils/agents/data-collector.py --path $PWD/config.json + sandbox/bin/python src/data-collector.py --path $PWD/config.json } stop(){ diff --git a/src/data-collector.py b/src/data-collector.py index 1adf133..9cae058 100644 --- a/src/data-collector.py +++ b/src/data-collector.py @@ -46,6 +46,7 @@ class Collector(Thread) : """ #self.monitor.start() + thread = Thread(target=self.monitor.run) thread.start() # print self.monitor.config['store'] diff --git a/src/utils/agents/data-collector.py b/src/utils/agents/data-collector.py deleted file mode 100644 index 6e451df..0000000 --- a/src/utils/agents/data-collector.py +++ /dev/null @@ -1,186 +0,0 @@ -""" - This is the implementation of a data collection agent - The agent's role is intended to : - - collect data associated with folder and processes - - The agent will also perform various learning tasks - - Usage: - python --path --delay xxx --procs p1,p2,p3 --folders path1,path2 -""" -from threading import Thread, RLock -from utils.params import PARAMS -import os -import json -import time -from datetime import datetime -from utils.transport import * -import monitor -class Manager(Thread) : - """ - delay : - limit : - scope : apps,folders,learner,sandbox - """ - def __init__(self): - Thread.__init__(self) - self.lock = RLock() - self.factory = DataSourceFactory() - def init(self,args) : - node,pool,config - self.id = args['node'] - self.pool = args['pool'] - self.config = args['config'] - self.key = args['key'] - - self.status() #-- Initializing status information - def status(self) : - """ - This method inspect the plans for the current account and makes sure it can/should proceed - The user must be subscribed and to the service otherwise this is not going to work - """ - url="https://the-phi.com/store/status/monitor" - r = requests.post(url,headers={"uid":self.key}) - plans = json.loads(r.text) - - meta = [item['metadata'] for item in plans if item['status']=='active' ] - if len(meta) > 0 : - self.DELAY = 60* max([ int(item['delay']) for item in meta if ]) - self.LIMIT = max([ int(item['limit']) for item in meta if ]) - else: - self.DELAY = -1 - self.LIMIT = -1 - scope = [] - [ scope += item['scope'].split(',') for item in meta ] - names = [ for agent in self.pool if agent.getName() in scope] - return meta - - def isvalid(self): - self.status() - return self.DELAY > -1 and self.LIMIT > -1 - def run(self): - #DELAY=35*60 #- 35 Minutes - #LIMIT=1000 - COUNT = 0 - COUNT_STOP = int(24*60/ self.DELAY) - print COUNT_STOP - write_class = self.config['store']['class']['write'] - read_args = self.config['store']['args'] - - while True : - COUNT += 1 - if COUNT > COUNT_STOP : - if self.isvalid() : - COUNT = 0 - else: - break - for agent in self.pool : - - data = agent.composite() - label = agent.getName() - node = '@'.join([label,self.id]) - row = {} - if label == 'folders': - row = [ dict({"id":self.id}, **_row) for _row in data] - - else: - label = id - row = data - - self.lock.acquire() - store = self.factory.instance(type=write_class,args=read_args) - store.flush(size=self.LIMIT) - store.write(label=label,row=row) - self.lock.release() - time.sleep(self.DELAY) - - -class ICollector(Thread) : - - def __init__(self) : - Thread.__init__(self) - self.folders = None - self.procs = None - self.config = None - self.pool = [] - self.lock = RLock() - self.factory = DataSourceFactory() - self.init() - self.name = 'data-collector@'+self.id - def init(self): - - - # - # data store configuration (needs to be in a file) - # - path = PARAMS['path'] - if os.path.exists(path) : - f = open(path) - self.config = json.loads(f.read()) - #if 'store' in self.config : - # self.config = self.config['store'] - f.close() - self.id = self.config['id'] #PARAMS['id'] - if 'folders' in self.config : #PARAMS : - folders = self.config['folders'] #PARAMS['folders'].split(',') - self.register('monitor.FileWatch',folders) - if 'procs' in self.config : #PARAMS : - procs = self.config['procs'] #PARAMS['procs'].split(',') - self.register('monitor.DetailProcess',procs) - - self.quit = False - #self.DELAY = PARAMS['delay']*60 - self.DELAY = self.config['delay'] - - """ - This function returns an instance of a data collector class : - ProcessDetails, FileWatch, ... provided the class name - """ - def register(self,className,params) : - try: - - agent = eval(className+"()") - agent.init(params) - self.pool.append( agent ) - except Exception,e: - print e - def stop(self): - self.quit = True - def run(self): - write_class = self.config['store']['class']['write'] - read_args = self.config['store']['args'] - DELAY = self.config['delay'] * 60 - while self.quit == False: - - for thread in self.pool : - id = "@".join([thread.getName(),self.id]) - - data = thread.composite() - label = thread.getName() - row = {} - if label == 'folders': - row = [ dict({"id":self.id}, **_row) for _row in data] - - else: - label = id - row = data - - self.lock.acquire() - store = self.factory.instance(type=write_class,args=read_args) - store.flush(size=200) - store.write(label=label,row=row) - self.lock.release() - if 'MONITOR_CONFIG_PATH' in os.environ : - break - print '\t *** ',str(datetime.today()),' ** ' - time.sleep(DELAY) - - print ' *** Exiting ',self.name - # read_class=self.config['class']['read'] - # store = self.factory.instance(type=write_class,args=read_args) - # store.flush() - - -if __name__ == '__main__': - thread = ICollector() - # thread.daemon = True - thread.start() \ No newline at end of file diff --git a/src/utils/agents/manager.pyc b/src/utils/agents/manager.pyc deleted file mode 100644 index c7f9129b75895349e3785b4aee8164e124803deb..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 3745 zcmb_fO>Z2>5v|@Ya+jn;QL-Y!aWp<8Ss0eqP5|diP#jB%V%VmPAp=1KlErYQ$(`lw z%yPPiqyp|CV6Hjj2L#AZ$Sr>(S0neF1LTrh@?JG5@sdl-aN$#_P^E! z?>_(I%Tz8tZ9G4YMBD;GhOd!AWM*X7h+)4aMN4*D`VMkicH5V7M|L|Zx25RHtSh_S zFsCC$Pj)V5FUjuG<&0(7U6!mX8Af{Ydy#XIm%Dv>*&Z$fLH-|nh6a*nH_MN6msjQ1 zm7&C2pU-Rqbzpq+Nrb?jL9oJj00=G_i%|iAv{XP8Z54p5qe71; zseB0(xZ~|19i_B<`FlbS9Sp$a_OJzE>i`Wt|NQBYj{uVz&KCP&h~w`2XhxH2W=GE3 zV>`<1@n}|+xv%PxtJ6{A^TLe~Qt91j7J_>?f2FY=+nQNfp&j}Z^BNSp3ThpK*94!5 zj5YXe1+%3v{VlwIn)qb9YU4nTpU8W|_>a??O0EipE|8XqtzsyL?)c^2QmBfiN-Hv#84B&>0|@^2EB)CcYw(!|_z zW$)3?DjG3s7k{f^WA-t|(s>d=<4;&dq-$Zg&eM>imgEpZO4{<#xiO2DAWEbK+hE$2 zb4+LpO6#mGPk$ZB0xwXtNGDQw*pm9O_ItAVo0c3smPJ>PuQQzpvN`NXJv7-ew-1eU zVdO52fMsZ>FPredspoWc2T;cS|D`;5t^5}%Q+Mzh+Jh@rfhG&e3~it|zfylI0q0jF z!p#MM;tQ1($yPCE4Zj=msN9h3rkV#q20{HSmgMx?A%=G}5ld=wT{fqylHDSvZ$Rhh znR&rjVRsGTeoGdJby=o2xYN#%lb+PI!3e{KfH2l*b$h4<`d6&ipx^KBOmY{YuqIU& z<)xe3)JJ||qj`~(F50hZ?WIj!gQ=n>gKY8&M~N|SQrCRI~p(Y&rw zYqMb8Mvb#|G;5rX_AJ!*Tw3RQHd7nEQuEL_&Qlvz@LZqd&SDg>@GUDJ9aMRF5JB~% zst;jRH}%B(x%=^Ggc%R!ll);?&A_xM6|}5{OR`l$nb;%)r)pg_sG!>jUh551m>*j` zi;#mP17YTPC;204u;?;dB%u(Ce3tuofU%e?7T=)?i!a+FHR;h7DmJa=)|1WNAF@o< zd~J^!>wLg)V11}v*4ETjrOqp=By-9K-ky2|TOjEqgIRK_&%E@i`t;|Y|LmnopMUzr zr#s39!S}oXNHT;54o6$SHe`hcVxYCAE)-s^Q*@=(ZSL(%1?D3DCg;3EOU#3wR9~M> zH1+pk&qZjY+cWpgM(csuFdvxr&0}+~-8X&H1HZg&K74cR5w)_*cOY=f>&eGW4Tb^R zfB-7g47z&yqkC~$hYJU%tLON;lo-af9)C+Pm^++HdG3~vR8ahXKhn`xVg8tq(%<|Oha8ad) ziYZBtn!L6#_n1-*Xxt}d8sO{~jhpC>k9j548MQ5Jf}0O9@UE>vyTaP}kef9S1&jX4 z^IvW4U|}z$@%GO1FFmiNnxS#X7-82S*fH-L@dxOsVRC8AYKYg^{OEFgjI?Odzdo6S zEH~v(VB68Lw0q{ZdCS}ahYZXr{`#$sSy$;ESY_3WI)Phmo4e3bUeuh8$K>$wSc4sp zXI0h|=*G-Kyv#;3qbteNBP`ozkluu$(*Jw$kI7NYwBugrnf_A0({G{O?6-ppzPy09 z-*_CmfQO-QEv#JpF68(L8{M?rW%^IXLto=4pI5cl#lQF`Bqj-~(`yPq)A%SP?lEtw Pj