from __future__ import division import unittest from monitor import Env, DetailProcess, ProcessCounter, Sandbox import monitor import os class TestMonitorServer(unittest.TestCase): def test_Environment(self): """ This test case is designed to test the existance of a resource set as an environment variable. This applies to files, folders (not values) """ p = Env() p.init(['PATH','HOME','SHELL']) r = p.composite() value = r['value'] self.assertTrue(value > 0 and value == 2/3) self.assertTrue(p.evaluate('PATH') == 0) def test_RunningProcess(self): p = DetailProcess() p.init(['rabbitmq-server','python','apache2']) r = p.composite() self.assertTrue(r) def test_ProcessCount(self): p= ProcessCounter() p.init(['foo','apache2','VBoxClient','rabbitmq-server','python']) r = p.composite() self.assertTrue( sum(r.values()) > 0 ) self.assertTrue( r['foo'] == 0) def test_VirtualEnv(self): requirements_path = os.sep.join([os.environ['PYTHONPATH'],"..","requirements.txt"]) sandbox_path = os.sep.join([os.environ['PYTHONPATH'],"..",'sandbox']) p = Sandbox() p.init({"sandbox":sandbox_path,"requirements":requirements_path}) p.composite() def test_map(self): p = DetailProcess() p.init(['rabbitmq-server','python','apache2']) r ={"test": p.composite()} logs = [r,{"x-test":p.composite()}] key = "test" id = "memory_usage" def mapper(row,emit): [emit(item['label'],item) for item in row ] def reducer(values): end = len(values)-1 beg = 0 if end < 100 else end - 100 return values[beg:] #def reducer(values): mrh = monitor.mapreducer() logs = mrh.filter('test',logs) print mrh.run(logs,mapper,None) if __name__ == '__main__' : unittest.main()