From 00aeade567fb0ea72250579d6199b26f3a7b9e55 Mon Sep 17 00:00:00 2001 From: Gogs Date: Mon, 14 Aug 2017 05:38:09 +0000 Subject: [PATCH] CO - Setup remote object and tied to service. @TODO: insure that monitor-logs is tied to the plan-id --- src/api/index.py | 29 +++++++++++++++++++-------- src/utils/agents/manager.py | 39 +++++++++++++++++++------------------ src/utils/transport.py | 2 ++ 3 files changed, 43 insertions(+), 27 deletions(-) diff --git a/src/api/index.py b/src/api/index.py index 73b02e1..8055370 100644 --- a/src/api/index.py +++ b/src/api/index.py @@ -28,8 +28,8 @@ from utils.transport import * from utils.workers import ThreadManager, Factory from utils.ml import ML,AnomalyDetection,AnalyzeAnomaly import utils.params as SYS_ARGS -import atexit import pickle +from utils.agents.manager import Manager app = Flask(__name__) app.config['SECRET_KEY'] = '!h8-[0v8]247-4-360' @@ -234,22 +234,35 @@ def app_status() : @app.route('/init/collector',methods=['POST']) def InitCollector(): + global CONFIG r = [] + manager={} try: key = request.headers['key'] node= request.headers['id'] - scope=request.headers['scope'] if 'scope' in request.headers else {} + #scope=request.headers['scope'] if 'scope' in request.headers else {} + #scope= json.loads(scope) + body= request.get_json(silent=True) # # @TODO : Validate the account & plan, insure preconditions are met/satisfied # m = {'apps':'monitor.DetailProcess','folders':'monitor.FileWatch'} - - for id in scope : - agent = eval(m[id]+"()") - r.append(pickle.dumps(agent)) - + lagents = [] + for id in m : + if id in body : + agent = eval(m[id]+"()") + args = body[id] if id in body else None + if args is not None : + agent.init(args) + lagents.append(agent) + config = dict(CONFIG) + config['store']['args']['dbname'] = 'monitor-logs' + config['store']['args']['uid'] = key + manager = Manager() + manager.init(pool=lagents,config=config,key=key,node=node) ; + r = [pickle.dumps(manager)] except Exception,e: - print e + print '***** ',e return json.dumps(r) """ diff --git a/src/utils/agents/manager.py b/src/utils/agents/manager.py index 89d68ac..6a9f504 100644 --- a/src/utils/agents/manager.py +++ b/src/utils/agents/manager.py @@ -1,29 +1,32 @@ -from threading import Thread, RLock - +#from threading import Thread, RLock +from __future__ import division import os import json import time from datetime import datetime from utils.transport import * import monitor - -class Manager(Thread) : +import requests +class Manager() : + def version(self): + return 1.0 """ delay : limit : scope : apps,folders,learner,sandbox """ def __init__(self): - Thread.__init__(self) - self.lock = RLock() + #Thread.__init__(self) + #self.lock = RLock() self.factory = DataSourceFactory() - def init(self,args) : - node,pool,config + def set(self,name,value): + setattr(name,value) + def init(self,**args) : self.id = args['node'] self.pool = args['pool'] self.config = args['config'] self.key = args['key'] - + print self.config['store'] self.status() #-- Initializing status information def status(self) : """ @@ -31,19 +34,20 @@ class Manager(Thread) : The user must be subscribed and to the service otherwise this is not going to work """ url="https://the-phi.com/store/status/monitor" - r = requests.post(url,headers={"uid":self.key}) + r = requests.get(url,headers={"uid":self.key}) plans = json.loads(r.text) meta = [item['metadata'] for item in plans if item['status']=='active' ] if len(meta) > 0 : - self.DELAY = 60* max([ int(item['delay']) for item in meta if ]) - self.LIMIT = max([ int(item['limit']) for item in meta if ]) + self.DELAY = 60* max([ int(item['delay']) for item in meta]) + self.LIMIT = max([ int(item['limit']) for item in meta ]) else: self.DELAY = -1 self.LIMIT = -1 scope = [] - [ scope += item['scope'].split(',') for item in meta ] - names = [ for agent in self.pool if agent.getName() in scope] + for item in meta : + scope = scope + item['scope'].split(',') + self.pool = [agent for agent in self.pool if agent.getName() in scope] return meta def isvalid(self): @@ -54,10 +58,8 @@ class Manager(Thread) : #LIMIT=1000 COUNT = 0 COUNT_STOP = int(24*60/ self.DELAY) - print COUNT_STOP write_class = self.config['store']['class']['write'] read_args = self.config['store']['args'] - while True : COUNT += 1 if COUNT > COUNT_STOP : @@ -66,7 +68,6 @@ class Manager(Thread) : else: break for agent in self.pool : - data = agent.composite() label = agent.getName() node = '@'.join([label,self.id]) @@ -75,13 +76,13 @@ class Manager(Thread) : row = [ dict({"id":self.id}, **_row) for _row in data] else: - label = id + #label = id row = data self.lock.acquire() store = self.factory.instance(type=write_class,args=read_args) store.flush(size=self.LIMIT) - store.write(label=label,row=row) + store.write(label=node,row=row) self.lock.release() time.sleep(self.DELAY) diff --git a/src/utils/transport.py b/src/utils/transport.py index 5fc7f62..8ed1370 100644 --- a/src/utils/transport.py +++ b/src/utils/transport.py @@ -562,6 +562,8 @@ class CouchdbWriter(Couchdb,Writer): @param dbname database name (target) """ def __init__(self,**args): + + Couchdb.__init__(self,**args) uri = args['uri'] self.uid = args['uid'] if 'filename' in args: