diff --git a/requirements.txt b/requirements.txt index f88cedc..bfdf737 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,13 @@ +click==6.6 +couchdbkit==0.6.5 Flask==0.11.1 Flask-Session==0.3.0 +http-parser==0.8.3 +itsdangerous==0.24 Jinja2==2.8 MarkupSafe==0.23 +numpy==1.11.3 +pika==0.10.0 +restkit==4.2.2 +socketpool==0.5.3 Werkzeug==0.11.11 -argparse==1.2.1 -click==6.6 -itsdangerous==0.24 -wsgiref==0.1.2 diff --git a/src/api/index.py b/src/api/index.py index de7fa3d..028fcc8 100644 --- a/src/api/index.py +++ b/src/api/index.py @@ -10,13 +10,14 @@ from flask import Flask, session, request, redirect, Response from flask.templating import render_template from flask_session import Session - +import time import sys import os import json import re import monitor import Queue +from utils.transport import * PARAMS = {'context':''} if len(sys.argv) > 1: @@ -54,10 +55,14 @@ f.close() # # -from threading import Timer,Thread -ProcessQueue = Queue.LifoQueue() -mthread = monitor.Monitor(HANDLERS,ProcessQueue,'processes') +from threading import Thread, RLock +p = {'uri':'http://dev.the-phi.com:5984','dbname':'monitor','uid':'logs','filename':'logs.JSON'} +factory = DataSourceFactory() +gWriter = factory.instance(type='CouchdbWriter',args=p) +gReader = factory.instance(type='CouchdbReader',args=p) +mthread = monitor.Monitor(HANDLERS,gWriter,'processes',) mthread.start() + #(Timer(10,mthread.run)).start() #mthread = Process(target=monitor.Monitor,args=(HANDLERS,ProcessQueue,'processes')) #mthread.start() @@ -76,14 +81,21 @@ def procs(id): def trends (): id = request.args.get('id') # key = request.args.get('key') + global mthread + # mLock.acquire() + + time.sleep(2) + doc = gReader.read() + doc['row'] handler = monitor.mapreducer() - r = handler.filter(id,mthread.logs) - print r - if 'kate' in r: - for item in r['kate']: - print item['hour'],item['minute'] + r = handler.filter(id,logs) r = handler.run(r,handler.mapper,handler.reducer) - + # mLock.release() + if 'Google Chrome' in r: + for item in r['Google Chrome']: + print item['hour'],item['minute'] + + return json.dumps(r) @app.route('/dashboard') def dashboard(): diff --git a/src/monitor.py b/src/monitor.py index d346c7b..0e8aa28 100755 --- a/src/monitor.py +++ b/src/monitor.py @@ -201,35 +201,35 @@ class DetailProcess(Analysis): return ma class Monitor (Thread): - def __init__(self,pConfig,pQueue,id='processes') : + def __init__(self,pConfig,pWriter,id='processes') : Thread.__init__(self) - self.config = pConfig[id] - self.queue = pQueue; + self.config = pConfig[id] + self.writer = pWriter; self.logs = [] self.handler = self.config['class'] self.mconfig = self.config['config'] - self.lock = RLock() + + def stop(self): + self.keep_running = False def run(self): r = {} - - while True: + self.keep_running = True + lock = RLock() + while self.keep_running: + for label in self.mconfig: - - - self.handler.init(self.mconfig[label]) - r[label] = self.handler.composite() - - self.queue.put(r) - #self.logs.append(r) + lock.acquire() + self.handler.init(self.mconfig[label]) + r = self.handler.composite() + self.writer.write(label=label,row = r) + lock.release() - self.prune() - self.queue.task_done() - self.logs.append(self.queue.get(block=False)) + self.prune() HALF_HOUR = 60*1 time.sleep(HALF_HOUR) - + print "Stopped ..." def prune(self) : MAX_ENTRIES = 100 if len(self.logs) > MAX_ENTRIES :