from __future__ import division import unittest from monitor import Env, DetailProcess, ProcessCounter, Sandbox, FileWatch import monitor import os import json #from utils.workers import Top, Learner #from multiprocessing import Lock from threading import Lock path = os.environ['MONITOR_CONFIG_PATH'] f = open(path) CONFIG = json.loads( f.read()) f.close() class TestMonitorServer(unittest.TestCase): def test_Environment(self): """ This test case is designed to test the existance of a resource set as an environment variable. This applies to files, folders (not values) """ p = Env() p.init(['PATH','HOME','SHELL']) r = p.composite() value = r['value'] self.assertTrue(value > 0 and value >= (100*2/3),value) self.assertTrue(p.evaluate('PATH') == 0) def test_RunningProcess(self): p = DetailProcess() #p.init(['kate','rabbitmq-server','python','apache2','firefox']) p.init(CONFIG['apps']) r = p.composite() #for row in r: # print row['user'],row['pid'],row['label'],row['status'],' mem ',row['memory_usage'],' cpu ',row['cpu_usage'] self.assertTrue(r) def test_ProcessCount(self): p= ProcessCounter() p.init(['foo','apache2','VBoxClient','rabbitmq-server','python']) r = p.composite() self.assertTrue( sum(r.values()) > 0 ) self.assertTrue( r['foo'] == 0) def test_VirtualEnv(self): requirements_path = os.sep.join([os.environ['PYTHONPATH'],"..","requirements.txt"]) sandbox_path = os.sep.join([os.environ['PYTHONPATH'],"..",'sandbox']) p = Sandbox() p.init({"sandbox":sandbox_path,"requirements":requirements_path}) p.composite() def test_StartTop(self): pass #lock = Lock() #p = Top(CONFIG,lock) #p.start() #p.join() def test_StartLearner(self): pass #lock = Lock() #p = Learner(CONFIG,lock) #p.start() def test_FileWatch(self): #conf =CONFIG['monitor']['folders'] #path =os.environ['FILE_PATH'] fw = FileWatch() print fw.evaluate('/Users/steve/Music') if __name__ == '__main__' : unittest.main()