-
-
Deletion/Archiving Plan
-
-
-
-
Powered By Machine Learning
-
+
+
+
Deletion/Archiving Plan
+
+
+
Powered By Machine Learning
+
+
-
-
By Age
-
-
- Approximately 00
-
-
-
-
-
By Size
+
+
By Age
+
-
+
00
+
Files
-
00
-
Files
-
-
- Approximately 00
-
+
+
+ Approximately 00
+
+
+
+
+
By Size
+
+
+
+ Approximately 00
+
+
-
-
+
+
+
diff --git a/src/monitor.py b/src/monitor.py
index fa0cc83..5da2e03 100755
--- a/src/monitor.py
+++ b/src/monitor.py
@@ -28,6 +28,8 @@ class Analysis:
def getNow(self):
d = datetime.datetime.now()
return {"month":d.month,"year":d.year, "day":d.day,"hour":d.hour,"minute":d.minute}
+ def getName(self):
+ return self.__class__.__name__
"""
This class is designed to analyze environment variables. Environment variables can either be folders, files or simple values
@@ -155,7 +157,8 @@ class DetailProcess(Analysis):
def init (self,names):
#Analysis.init(self)
self.names = names;
-
+ def getName(self):
+ return "apps"
def split(self,name,stream):
pattern = "(\d+.{0,1}\d*)\x20*(\d+.{0,1}\d*)\x20*(\d+.{0,1}\d*)".replace(":name",name).strip()
@@ -235,6 +238,8 @@ class FileWatch(Analysis):
pass
def init(self,folders):
self.folders = folders;
+ def getName(self):
+ return "folders"
def split(self,row):
x = row.split(' ')
diff --git a/src/utils/agents/__init__.py b/src/utils/agents/__init__.py
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/src/utils/agents/__init__.py
@@ -0,0 +1 @@
+
diff --git a/src/utils/agents/data-collector.py b/src/utils/agents/data-collector.py
new file mode 100644
index 0000000..4aeaace
--- /dev/null
+++ b/src/utils/agents/data-collector.py
@@ -0,0 +1,102 @@
+"""
+ This is the implementation of a data collection agent
+ The agent's role is intended to :
+ - collect data associated with folder and processes
+ - The agent will also perform various learning tasks
+
+ Usage:
+ python --path
--delay xxx --procs p1,p2,p3 --folders path1,path2
+"""
+from threading import Thread, RLock
+from utils.params import PARAMS
+import os
+import json
+import time
+from datetime import datetime
+from utils.transport import *
+import monitor
+class ICollector(Thread) :
+
+ def __init__(self) :
+ Thread.__init__(self)
+ self.folders = None
+ self.procs = None
+ self.config = None
+ self.pool = []
+ self.lock = RLock()
+ self.factory = DataSourceFactory()
+ self.init()
+ def init(self):
+
+
+ #
+ # data store configuration (needs to be in a file)
+ #
+ path = PARAMS['path']
+ if os.path.exists(path) :
+ f = open(path)
+ self.config = json.loads(f.read())
+ #if 'store' in self.config :
+ # self.config = self.config['store']
+ f.close()
+ self.id = self.config['id'] #PARAMS['id']
+ if 'folders' in self.config : #PARAMS :
+ folders = self.config['folders'] #PARAMS['folders'].split(',')
+ self.register('monitor.FileWatch',folders)
+ if 'procs' in self.config : #PARAMS :
+ procs = self.config['procs'] #PARAMS['procs'].split(',')
+ self.register('monitor.DetailProcess',procs)
+
+ self.quit = False
+ #self.DELAY = PARAMS['delay']*60
+ self.DELAY = self.config['delay']
+
+ """
+ This function returns an instance of a data collector class :
+ ProcessDetails, FileWatch, ... provided the class name
+ """
+ def register(self,className,params) :
+ try:
+
+ agent = eval(className+"()")
+ agent.init(params)
+ self.pool.append( agent )
+ except Exception,e:
+ print e
+ def stop(self):
+ self.quit = True
+ def run(self):
+ write_class = self.config['store']['class']['write']
+ read_args = self.config['store']['args']
+
+ while self.quit == False:
+
+ for thread in self.pool :
+ id = "@".join([thread.getName(),self.id])
+
+ data = thread.composite()
+ label = thread.getName()
+ row = {}
+ if label == 'folders':
+ row[id] = data
+ else:
+ label = id
+ row = data
+ self.lock.acquire()
+ store = self.factory.instance(type=write_class,args=read_args)
+ store.write(label=label,row=row)
+ self.lock.release()
+ if 'MONITOR_CONFIG_PATH' in os.environ :
+ break
+ time.sleep(self.DELAY)
+
+ print ' *** Exiting ',self.name
+ # read_class=self.config['class']['read']
+ # store = self.factory.instance(type=write_class,args=read_args)
+ # store.flush()
+
+
+if __name__ == '__main__':
+ thread = ICollector()
+ # thread.daemon = True
+ thread.start()
\ No newline at end of file
diff --git a/src/utils/agents/learner.py b/src/utils/agents/learner.py
new file mode 100644
index 0000000..26c78bf
--- /dev/null
+++ b/src/utils/agents/learner.py
@@ -0,0 +1,83 @@
+"""
+ This file encapsulates a class that is intended to perform learning
+"""
+from __future__ import division
+import numpy as np
+from threading import Thread,RLock
+from utils.transport import *
+from utils.ml import AnomalyDetection,ML
+from utils.params import PARAMS
+import time
+"""
+ This class is intended to apply anomaly detection to various areas of learning
+ The areas of learning that will be skipped are :
+ ['_id','_rev','learn'] ...
+
+ @TODO:
+ - Find a way to perform dimensionality reduction if need be
+"""
+class Anomalies(Thread) :
+ def __init__(self,lock):
+ Thread.__init__(self)
+ path = PARAMS['path']
+ self.name = self.__class__.__name__.lower()
+ if os.path.exists(path) :
+ f = open(path)
+ self.config = json.loads(f.read())
+ f.close()
+
+ #
+ # Initializing data store & factory class
+ #
+ self.id = self.config['id']
+ self.apps = self.config['procs'] if 'procs' in self.config else []
+ self.rclass = self.config['store']['class']['read']
+ self.wclass = self.config['store']['class']['write']
+ self.rw_args = self.config['store']['args']
+ self.factory = DataSourceFactory()
+ self.quit = False
+ self.lock = lock
+ def format(self,stream):
+ pass
+ def stop(self):
+ self.quit = True
+ def run(self):
+ DELAY = self.config['delay'] * 60
+ reader = self.factory.instance(type=self.rclass,args=self.rw_args)
+ data = reader.read()
+ key = 'apps'
+ rdata = data[key]
+ features = ['memory_usage','cpu_usage']
+ yo = {"1":["running"],"name":"status"}
+ while self.quit == False :
+ print ' *** ',self.name, ' ' , str(datetime.today())
+ for app in self.apps:
+ print '\t',str(datetime.today()),' ** ',app
+ logs = ML.Filter('label',app,rdata)
+ if logs :
+ handler = AnomalyDetection()
+ value = handler.learn(logs,'label',app,features,yo)
+ print value
+ if value is not None:
+ value = dict(value,**{"features":features})
+ r[id][app] = value
+ self.lock.acquire()
+ writer = self.factory.instance(type=self.wclass,args=self.rw_args)
+ writer.write(label='learn',row=value)
+ self.lock.release()
+ #
+ if 'MONITOR_CONFIG_PATH' in os.environ :
+ break
+ time.sleep(DELAY)
+ print ' *** Exiting ',self.name.replace('a','A')
+
+
+
+
+class Regression(Thread):
+ def __init__(self,params):
+ pass
+if __name__ == '__main__' :
+ lock = RLock()
+ thread = Anomalies(lock)
+ thread.start()
\ No newline at end of file
diff --git a/src/utils/transport.py b/src/utils/transport.py
index 7b9bc11..bea8b61 100644
--- a/src/utils/transport.py
+++ b/src/utils/transport.py
@@ -10,6 +10,7 @@ import numpy as np
from couchdbkit import Server
import re
from csv import reader
+from datetime import datetime
"""
@TODO: Write a process by which the class automatically handles reading and creating a preliminary sample and discovers the meta data
"""
@@ -555,7 +556,24 @@ class CouchdbWriter(Couchdb,Writer):
document[label] = []
document[label].append(row)
self.dbase.save_doc(document)
-
+
+ def flush(self,params=None):
+ document = self.dbase.get(self.uid)
+ content = {}
+ _doc = {}
+ for id in document:
+ if id in ['_id','_rev','_attachments'] :
+ _doc[id] = document[id]
+ else:
+ content[id] = document[id]
+
+ content = json.dumps(content)
+ document= _doc
+ now = str(datetime.today())
+
+ name = '-'.join([document['_id'] , now,'.json'])
+ self.dbase.save_doc(document)
+ self.dbase.put_attachment(document,content,name,'application/json')
"""
This class acts as a factory to be able to generate an instance of a Reader/Writer
Against a Queue,Disk,Cloud,Couchdb