|
|
|
@ -1,29 +1,32 @@
|
|
|
|
|
from threading import Thread, RLock
|
|
|
|
|
|
|
|
|
|
#from threading import Thread, RLock
|
|
|
|
|
from __future__ import division
|
|
|
|
|
import os
|
|
|
|
|
import json
|
|
|
|
|
import time
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
from utils.transport import *
|
|
|
|
|
import monitor
|
|
|
|
|
|
|
|
|
|
class Manager(Thread) :
|
|
|
|
|
import requests
|
|
|
|
|
class Manager() :
|
|
|
|
|
def version(self):
|
|
|
|
|
return 1.0
|
|
|
|
|
"""
|
|
|
|
|
delay : <value>
|
|
|
|
|
limit : <value>
|
|
|
|
|
scope : apps,folders,learner,sandbox
|
|
|
|
|
"""
|
|
|
|
|
def __init__(self):
|
|
|
|
|
Thread.__init__(self)
|
|
|
|
|
self.lock = RLock()
|
|
|
|
|
#Thread.__init__(self)
|
|
|
|
|
#self.lock = RLock()
|
|
|
|
|
self.factory = DataSourceFactory()
|
|
|
|
|
def init(self,args) :
|
|
|
|
|
node,pool,config
|
|
|
|
|
def set(self,name,value):
|
|
|
|
|
setattr(name,value)
|
|
|
|
|
def init(self,**args) :
|
|
|
|
|
self.id = args['node']
|
|
|
|
|
self.pool = args['pool']
|
|
|
|
|
self.config = args['config']
|
|
|
|
|
self.key = args['key']
|
|
|
|
|
|
|
|
|
|
print self.config['store']
|
|
|
|
|
self.status() #-- Initializing status information
|
|
|
|
|
def status(self) :
|
|
|
|
|
"""
|
|
|
|
@ -31,19 +34,20 @@ class Manager(Thread) :
|
|
|
|
|
The user must be subscribed and to the service otherwise this is not going to work
|
|
|
|
|
"""
|
|
|
|
|
url="https://the-phi.com/store/status/monitor"
|
|
|
|
|
r = requests.post(url,headers={"uid":self.key})
|
|
|
|
|
r = requests.get(url,headers={"uid":self.key})
|
|
|
|
|
plans = json.loads(r.text)
|
|
|
|
|
|
|
|
|
|
meta = [item['metadata'] for item in plans if item['status']=='active' ]
|
|
|
|
|
if len(meta) > 0 :
|
|
|
|
|
self.DELAY = 60* max([ int(item['delay']) for item in meta if ])
|
|
|
|
|
self.LIMIT = max([ int(item['limit']) for item in meta if ])
|
|
|
|
|
self.DELAY = 60* max([ int(item['delay']) for item in meta])
|
|
|
|
|
self.LIMIT = max([ int(item['limit']) for item in meta ])
|
|
|
|
|
else:
|
|
|
|
|
self.DELAY = -1
|
|
|
|
|
self.LIMIT = -1
|
|
|
|
|
scope = []
|
|
|
|
|
[ scope += item['scope'].split(',') for item in meta ]
|
|
|
|
|
names = [ for agent in self.pool if agent.getName() in scope]
|
|
|
|
|
for item in meta :
|
|
|
|
|
scope = scope + item['scope'].split(',')
|
|
|
|
|
self.pool = [agent for agent in self.pool if agent.getName() in scope]
|
|
|
|
|
return meta
|
|
|
|
|
|
|
|
|
|
def isvalid(self):
|
|
|
|
@ -54,10 +58,8 @@ class Manager(Thread) :
|
|
|
|
|
#LIMIT=1000
|
|
|
|
|
COUNT = 0
|
|
|
|
|
COUNT_STOP = int(24*60/ self.DELAY)
|
|
|
|
|
print COUNT_STOP
|
|
|
|
|
write_class = self.config['store']['class']['write']
|
|
|
|
|
read_args = self.config['store']['args']
|
|
|
|
|
|
|
|
|
|
while True :
|
|
|
|
|
COUNT += 1
|
|
|
|
|
if COUNT > COUNT_STOP :
|
|
|
|
@ -66,7 +68,6 @@ class Manager(Thread) :
|
|
|
|
|
else:
|
|
|
|
|
break
|
|
|
|
|
for agent in self.pool :
|
|
|
|
|
|
|
|
|
|
data = agent.composite()
|
|
|
|
|
label = agent.getName()
|
|
|
|
|
node = '@'.join([label,self.id])
|
|
|
|
@ -75,13 +76,13 @@ class Manager(Thread) :
|
|
|
|
|
row = [ dict({"id":self.id}, **_row) for _row in data]
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
label = id
|
|
|
|
|
#label = id
|
|
|
|
|
row = data
|
|
|
|
|
|
|
|
|
|
self.lock.acquire()
|
|
|
|
|
store = self.factory.instance(type=write_class,args=read_args)
|
|
|
|
|
store.flush(size=self.LIMIT)
|
|
|
|
|
store.write(label=label,row=row)
|
|
|
|
|
store.write(label=node,row=row)
|
|
|
|
|
self.lock.release()
|
|
|
|
|
time.sleep(self.DELAY)
|
|
|
|
|
|
|
|
|
|