diff --git a/src/utils/agents/data-collector.py b/src/utils/agents/data-collector.py index 39fe528..ae1f0f8 100644 --- a/src/utils/agents/data-collector.py +++ b/src/utils/agents/data-collector.py @@ -85,7 +85,7 @@ class ICollector(Thread) : row = data self.lock.acquire() store = self.factory.instance(type=write_class,args=read_args) - + store.flush(size=200) store.write(label=label,row=row) self.lock.release() if 'MONITOR_CONFIG_PATH' in os.environ : diff --git a/src/utils/agents/learner.py b/src/utils/agents/learner.py index 4dbb15b..8cbf40a 100644 --- a/src/utils/agents/learner.py +++ b/src/utils/agents/learner.py @@ -3,6 +3,7 @@ """ from __future__ import division import numpy as np +from sklearn import linear_model from threading import Thread,RLock from utils.transport import * from utils.ml import AnomalyDetection,ML @@ -13,10 +14,17 @@ class BaseLearner(Thread): Thread.__init__(self) path = PARAMS['path'] self.name = self.__class__.__name__.lower() + self.rclass= None + self.wclass= None + self.rw_args=None if os.path.exists(path) : f = open(path) self.config = json.loads(f.read()) f.close() + self.rclass = self.config['store']['class']['read'] + self.wclass = self.config['store']['class']['write'] + self.rw_args = self.config['store']['args'] + else: self.config = None self.lock = lock @@ -98,7 +106,7 @@ class Anomalies(BaseLearner) : """ class Regression(BaseLearner): def __init__(self,lock): - BaseLearner.__init__(self) + BaseLearner.__init__(self,lock) self.folders = self.config['folders'] self.id = self.config['id'] def run(self): @@ -109,7 +117,12 @@ class Regression(BaseLearner): data = ML.Filter('id',self.id,data['folders']) xo = ML.Extract(['date'],data) yo = ML.Extract(['count'],data) - numpy.linalg.lstsq(xo, yo, rcond=-1) + + + pass + # print np.var(xo,yo) + + diff --git a/src/utils/ml.py b/src/utils/ml.py index ddae91c..4c56910 100644 --- a/src/utils/ml.py +++ b/src/utils/ml.py @@ -42,7 +42,12 @@ class ML: def Extract(lattr,data): if isinstance(lattr,basestring): lattr = [lattr] - return [[row[id] for id in lattr] for row in data] + # return [[row[id] for id in lattr] for row in data] + r = [[row[id] for id in lattr] for row in data] + if len(lattr) == 1 : + return [x[0] for x in r] + else: + return r @staticmethod def CleanupName(value) : return value.replace('$','').replace('.+','') diff --git a/src/utils/transport.py b/src/utils/transport.py index eee6265..b26849c 100644 --- a/src/utils/transport.py +++ b/src/utils/transport.py @@ -562,8 +562,25 @@ class CouchdbWriter(Couchdb,Writer): document[label] = [] document[label].append(row) self.dbase.save_doc(document) - - def flush(self,params=None): + def flush(self,params) : + + size = params['size'] + + document = self.dbase.get(self.uid) + for key in documment: + if key not in ['_id','_rev','_attachments'] : + content = document[key] + else: + content = [] + if isinstance(content,list): + index = len(content) - size + content = content[index:] + document[key] = content + else: + document[key] = {} + self.dbase.save_doc(document) + + def archive(self,params=None): document = self.dbase.get(self.uid) content = {} _doc = {}