From 1a9c4b6630e67868d6b024944ce759ef07eec0e0 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 10 Nov 2020 13:21:36 -0600 Subject: [PATCH] removing unused packages --- src/data-collector.py | 134 ------- src/monitor.py | 174 --------- src/utils/__init__.py | 1 - src/utils/agents/__init__.py | 1 - src/utils/agents/actor.py | 368 ------------------ src/utils/agents/learner.py | 132 ------- src/utils/agents/manager.py | 220 ----------- src/utils/params.py | 32 -- src/utils/services.py | 598 ----------------------------- src/utils/transport.py | 709 ----------------------------------- 10 files changed, 2369 deletions(-) delete mode 100755 src/data-collector.py delete mode 100755 src/monitor.py delete mode 100755 src/utils/__init__.py delete mode 100755 src/utils/agents/__init__.py delete mode 100755 src/utils/agents/actor.py delete mode 100755 src/utils/agents/learner.py delete mode 100755 src/utils/agents/manager.py delete mode 100755 src/utils/params.py delete mode 100755 src/utils/services.py delete mode 100755 src/utils/transport.py diff --git a/src/data-collector.py b/src/data-collector.py deleted file mode 100755 index 1df9474..0000000 --- a/src/data-collector.py +++ /dev/null @@ -1,134 +0,0 @@ -h=""" - This is a data-collector client, that is intended to perform data-collection operations and submit them to an endpoint - @required: - - key application/service key - - id node identifier - usage : - python data-collector.py --path config.json - The configuration file is structured as JSON object as follows : - { - id : identifier - key: customer's identification key, - api: http://localhost/monitor/1/client - folders:[] - } - NOTE: You can download a sample configuration file from https://the-phi.com/smart-top -""" -from utils.params import PARAMS as SYS_ARGS, Logger -import os -import requests -import json -# from threading import Thread, RLock -from monitor import Apps, Folders -import time -from datetime import datetime -class Collector : - def __init__(self) : - """ - The configuration file is passed to the class for basic initialization of variables - """ - self.httpclient = requests.Session() - - def init(self): - # if 'folders' not in SYS_ARGS : - # # - # # If nothing is set it will monitor the temporary directory - # self.locations = [os.environ[name] for name in ['TEMP','TMP','TMPDIR'] if name in os.environ] - # else: - # self.locations = SYS_ARGS['folders'] - - # - # -- let's get the list of features we are interested . - url = SYS_ARGS['api']+'/1/client/login' - key = SYS_ARGS['key'] - self.id = SYS_ARGS['id'] if 'id' in SYS_ARGS else os.environ['HOSTNAME'] - headers = {"key":key,"id":self.id} - - # - #-- what features are allowed - r = self.httpclient.post(url,headers=headers) - - if r.status_code == 200 : - r = r.json() - - self.features = r['features'] - self.config = r['config'] #-- contains apps and folders - Logger.log(action="login",value=r) - else: - self.features = None - self.config = None - Logger.log(action='login',value='error') - def callback(self,channel,method,header,stream): - pass - def listen(self): - - factory = DataSourceFactory() - - # self.qlistener = factory.instance(type="QueueListener",args=_args) - # self.qlistener.callback = self.callback - # self.qlistener.init(SYS_ARGS['id']) - def post(self,**args) : - """ - This function will post data to the endpoint - """ - url = SYS_ARGS['api']+'/1/client/log' - key = SYS_ARGS['key'] - id = SYS_ARGS['id'] if 'id' in SYS_ARGS else os.environ['HOSTNAME'] - headers = {"key":key,"id":id,"context":args['context'],"content-type":"application/json"} - - body = args['data'].fillna('').to_json(orient='records') - - if args['data'].shape[0] > 0 : - r = self.httpclient.post(url,headers=headers,data=body) - Logger.log(action="post."+args['context'],value=r.status_code) - else: - Logger.log(action="data.error",value="no data :: "+args['context']) - - def run(self): - """ - This function will execute the basic functions to monitor folders and apps running on the system - given the configuration specified on the server . - """ - while True : - try: - self.init() - if self.config and self.features : - ELAPSED_TIME = 60* int(self.features['schedule'].replace("min","").strip()) - if 'apps' in self.config : - self.post( data=(Apps(node=self.id)).get(filter=self.config['apps']),context="apps") - if 'folders' in self.config and self.config['folders'] : - folder = Folders(node=self.id) - f = folder.get(path=self.config['folders']) - self.post(data = f ,context="folders") - - Logger.log(action='sleeping',value=ELAPSED_TIME) - # - # In case no configuration is provided, the system will simply fall asleep and wait - # @TODO: Evaluate whether to wake up the system or not (security concerns)! - # - time.sleep(ELAPSED_TIME) - - - except Exception,e: - Logger.log(action='error',value=e.message) - print e - #break - - - - - pass -if __name__ == '__main__' : - # - # - if 'path' in SYS_ARGS : - path = SYS_ARGS['path'] - f = open(path) - SYS_ARGS = json.loads(f.read()) - f.close() - - Logger.init('data-collector') - collector = Collector() - collector.run() -else: - print (h) diff --git a/src/monitor.py b/src/monitor.py deleted file mode 100755 index f81ee96..0000000 --- a/src/monitor.py +++ /dev/null @@ -1,174 +0,0 @@ -""" - Steve L. Nyemba - The Phi Technology - Smart Top - - This program is the core for evaluating folders and applications. Each class is specialized to generate a report in a pandas data-frame - The classes will focus on Apps, Folders and Protocols - - SmartTop.get(**args) - @TODO: - Protocols (will be used in anomaly detection) -""" -from __future__ import division -import os -import subprocess -import numpy as np -import sys -import pandas as pd -import datetime -class SmartTop: - def __init__(self,**args): - self.node = args['node'] - def get(self,**args): - return None - -class Apps(SmartTop) : - def __init__(self,**args): - """ - This class will process a system command and parse the outpout accordingly given a parser - @param parse is a parser pointer - """ - SmartTop.__init__(self,**args) - self.cmd = "ps -eo pid,user,pmem,pcpu,stat,etime,args|awk 'OFS=\";\" {$1=$1; if($5 > 9) print }'" - self.xchar = ';' - - def get_app(self,stream): - index = 1 if os.path.exists(" ".join(stream[:1])) else len(stream)-1 - - cmd = " ".join(stream[:index]) if index > 0 else " ".join(stream) - - if ' ' in cmd.split('/')[len(cmd.split('/'))-1] : - p = cmd.split('/')[len(cmd.split('/'))-1].split(' ') - name = p[0] - args = " ".join(p[1:]) - else: - name = cmd.split('/')[len(cmd.split('/'))-1] - args = " ".join(stream[index:]) if index > 0 else "" - return [name,cmd,args] - def to_pandas(self,m): - """ - This function will convert the output of ps to a data-frame - @param m raw matrix i.e list of values like a csv - - """ - d = datetime.datetime.now().strftime('%m-%d-%Y') - t = datetime.datetime.now().strftime('%H:%M:%S') - m = [item for item in m if len(item) != len (m[0])] - m = "\n".join(m[1:]) - df = pd.read_csv(pd.compat.StringIO(m),sep=self.xchar) - df['date'] = np.repeat(d,df.shape[0]) - df['time'] = np.repeat(t,df.shape[0]) - df['node'] = np.repeat(self.node,df.shape[0]) - df.columns =['pid','user','mem','cpu','status','started','name','cmd','args','date','time','node'] - return df - def empty(self,name): - return pd.DataFrame([{"pid":None,"user":None,"mem":0,"cpu":0,"status":"X","started":None,"name":name,"cmd":None,"args":None,"date":None,"time":None,"node":self.node}]) - def parse(self,rows): - m = [] - TIME_INDEX = 5 - ARGS_INDEX = 6 - - for item in rows : - if rows.index(item) != 0 : - parts = item.split(self.xchar) - row = parts[:TIME_INDEX] - row.append(' '.join(parts[TIME_INDEX:ARGS_INDEX])) - row += self.get_app(parts[ARGS_INDEX:]) - else: - row = item.split(self.xchar) - row = (self.xchar.join(row)).strip() - if len(row.replace(";","")) > 0 : - m.append(row) - - return m - def get(self,**args): - """ - This function returns a the output of a command to the calling code that is piped into the class - The output will be stored in a data frame with columns - @ - """ - try: - - handler = subprocess.Popen(self.cmd,shell=True,stdout=subprocess.PIPE) - stream = handler.communicate()[0] - rows = stream.split('\n') - df = self.to_pandas(self.parse(rows)) - r = pd.DataFrame() - if 'filter' in args : - pattern = "|".join(args['filter']) - i = df.cmd.str.contains(pattern) - r = df[i].copy() - r.index = np.arange(0,r.shape[0]) - ii= (1 + np.array(i)*-1) == 1 - other = pd.DataFrame(df[ii].sum()).T.copy() - other.index = np.arange(0,other.shape[0]) - other.user = other.name = other.status = other.cmd = other.args = 'other' - other.started = other.pid = -1 - other = other[other.columns[1:]] - for name in args['filter'] : - i = r.cmd.str.contains(str(name.strip()),case=False,na=False) - if i.sum() == 0: - r = r.append(self.empty(name),sort=False) - else : - pass - # r[i].update (pd.DataFrame({"name":np.repeat(name,r.shape[0])})) - r.loc[i, 'name'] = np.repeat(name,i.sum()) - # r.loc[i].name = name - - - r = r.append(other,sort=False) - r.index = np.arange(r.shape[0]) - - return r - except Exception,e: - print (e) - return None - - -class Folders(SmartTop): - """ - This class will assess a folder and produce a report in a data-frame that can be later on used for summary statistics - """ - def __init__(self,**args): - SmartTop.__init__(self,**args) - - def _get(self,dir_path,r=[]): - for child in os.listdir(dir_path): - path = os.path.join(dir_path, child) - if os.path.isdir(path): - self._get(path,r) - - else: - size = os.path.getsize(path) - file_date = os.path.getatime(path) - file_date = datetime.datetime.fromtimestamp(file_date) - now = datetime.datetime.now() - age = (now - file_date ).days - name = os.path.basename(path) - r.append({"name":name,"path":path,"size":size,"age":age,"date":now.strftime('%m-%d-%Y'),"time":now.strftime('%H:%M:%S'),"node":self.node }) - return r - - def get(self,**args): - # path = args['path'] - - if isinstance(args['path'],list) == False: - paths = [args['path']] - else: - paths = args['path'] - paths = paths - _out = pd.DataFrame() - for path in paths : - name = os.path.basename(path) - if os.path.exists(path) : - # - # If the folder does NOT exists it should not be treated. - # - rows = self._get(path) - if len(rows) > 0 : - r = pd.DataFrame(rows) - r = pd.DataFrame([{"name":name,"path":path,"files":r.shape[0],"age_in_days":r.age.mean(),"size_in_kb":r['size'].sum(),"date":r.date.max(),"time":r.time.max(),"node":r.node.max()}]) - _out = _out.append(r,sort=False) - # - # @TODO: The state of the hard-drive would be a good plus - # os.system('df -h /') - _out.index = np.arange(0,_out.shape[0]) - return _out diff --git a/src/utils/__init__.py b/src/utils/__init__.py deleted file mode 100755 index 8b13789..0000000 --- a/src/utils/__init__.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/utils/agents/__init__.py b/src/utils/agents/__init__.py deleted file mode 100755 index 8b13789..0000000 --- a/src/utils/agents/__init__.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/utils/agents/actor.py b/src/utils/agents/actor.py deleted file mode 100755 index 864ae5f..0000000 --- a/src/utils/agents/actor.py +++ /dev/null @@ -1,368 +0,0 @@ -""" - This class is designed to be an actor class i.e it will undertake certain actions given an event detected - The platform has 2 main sections (detection & analysis). - Action Types (Actors): - - Alert : Sends an email or Webhook - - Apps : Kill, Start - - Folder: Archive, Delete (all, age, size) - By design we are to understand that a message is structured as follows: - {to,from,content} with content either being an arbitrary stream (or JSON) - @TODO: - - upgrade to python 3.x -""" -import json -from threading import Thread -import os -import shutil -import subprocess -import re -from monitor import ProcessCounter -from utils.transport import QueueListener, QueueWriter, QueueReader -from utils.params import PARAMS -import smtplib -from email.mime.multipart import MIMEMultipart -from email.mime.text import MIMEText -from datetime import datetime -from StringIO import StringIO -from utils.services import Dropbox, Google -class Actor(): - @staticmethod - def instance(name,args,logger=None): - """ - This function is a singleton that acts as a factory object for all the instances of this subclass - @param name name of the class to instantiate - @param args arguments to be passed in {configuration} - """ - r = [] - if not isinstance(name,list): - name = [name] - for id in name : - try: - o = eval("".join([id,"()"])) - o.Initialize(args,logger) - r.append(o) - except Exception,e: - if logger is not None : - logger.log(subject='Actor',object='Factory',action='error',value=e.message) - - print str(e) - return r[0] if len(r) == 1 else r - def __init__(self): - """ - Initializing the class with configuration. The configuration will be specific to each subclass - @param args arguments the class needs to be configured - """ - - pass - def getName(self): - return self.__class__.__name__.lower() - - # def getIdentifier(self): - # return self.__class__.__name__.lower() - - def Initialize(self,args,logger=None): - self.config = args - self.logger = logger - - def isValid(self,**item): - return False - - def execute(self,cmd): - stream = None - try: - # subprocess.call (cmd,shell=False) - out = subprocess.Popen(cmd,stdout=subprocess.PIPE) - #stream = handler.communicate()[0] - except Exception,e: - pass - def post(self,**args): - pass - def log(self,**args): - if self.logger : - args['subject'] = self.getName() - self.logger.log(args) -class Apps(Actor) : - """ - This class is designed to handle application, restart, if need be. - conf{app-name:{args}} - """ - def __init__(self): - Actor.__init__(self) - # self.ng = None - - def isValid(self,**args): - """ - We insure that the provided application exists and that the payload is correct - The class will only respond to reboot,kill,start actions - p validate the payload - q validate the app can be restarted - - @NOTE: killing the application has no preconditions/requirements - """ - params = args['params'] - action = args['action'] - p = len(set(params.keys()) & set(['cmd','label'])) == 2 - q = False - r = action in ['reboot','kill','start'] - if p : - q = os.path.exists(params['cmd']) - return p and q and r - def init(self,action,params) : - """ - This function will initialize the the actor with applications and associated arguments - @param args {"apps_o":"","app_x":params} - """ - self.action = action - self.params = params - self.log(action='init',object=action,value=params) - - def startup(self,cmd) : - """ - This function is intended to start a program given the configuration - @TODO We need to find the command in case the app has crashed - """ - try: - print"" - print cmd - os.system(cmd +" &") - self.log(action='startup',value=cmd) - except Exception, e: - print e - - def kill(self,name) : - """ - kill processes given the name, The function will not be case sensitive and partial names are accepted - @NOTE: Make sure the reference to the app is not ambiguous - """ - try: - args = "".join(['ps -eo pid,command|grep -Ei "',name.lower(),'"|grep -E "^ {0,}[0-9]+" -o|xargs kill -9']) - - #self.execute([args]) - subprocess.call([args],shell=True) - except Exception,e: - print e - - def run(self): - __action = str(self.action).strip() - __params = dict(self.params) - pointer = None - if __action == 'reboot' : - def pointer(): - self.kill(__params['label']) - self.startup(__params['cmd']) - elif __action == 'kill': - def pointer(): - self.kill(__params['label']) - elif __action =='start': - def pointer() : - self.startup(__params['cmd']) - if pointer : - thread = Thread(target=pointer) - thread.start() - # pointer() - def analyze(self,logs) : - """ - This function is designed to analyze a few logs and take appropriate action - @param logs logs of application/process data; folder analysis or sandbox analysis - """ - pass - # for item in logs : - # name = item['label'] - # if self.can_start(name) : - # self.startup(name) - # # - - -class Mailer (Actor): - """ - This class is a mailer agent - """ - def __init__(self): - Actor.__init__(self) - """ - conf = {uid:,host:,port:,password:} - """ - def init(self,conf) : - self.uid = conf['uid'] - - - try: - - self.handler = smtplib.SMTP_SSL(conf['host'],conf['port']) - r = self.handler.login(self.uid,conf['password']) - # - # @TODO: Check the status of the authentication - # If not authenticated the preconditions have failed - # - except Exception,e: - print str(e) - self.handler = None - pass - - - def send(self,**args) : - subject = args['subject'] - message = args['message'] - to = args['to'] - if '<' in message and '>' in message : - message = MIMEText(message,'html') - else: - message = MIMEText(message,'plain') - message['From'] = self.uid - message['To'] = to - message['Subject'] = subject - return self.handler.sendmail(self.uid,to,message.as_string()) - def close(self): - self.handler.quit() - - -class Folders(Actor): - def __init__(self): - Actor.__init__(self) - """ - This is designed to handle folders i.e cleaning/archiving the folders - if the user does NOT have any keys to cloud-view than she will not be able to archive - {threshold:value} - @params threshold in terms of size, or age. It will be applied to all folders - """ - def init(self,action,params): - self.action = action - # print args - # def init(self,args): - """ - This is initialized with parameters from the plan. - The parameters should be specific to the actor (folder) - folder_size - """ - #self.lfolders = args['folders'] #config['folders'] - #self.action = args['action'] #{clear,archive} config['actions']['folders'] - - plan = params['plan'] - self.threshold = self.get_size( plan['folder_size']) #self.config['threshold']) - # self.action = action - self.params = params - - - def isValid(self,**args): - action = args['action'] - - - params = args['params'] - p = len(set(action) & set(['clean','archive','backup'])) > 0 - q = False - r = False - if p : - q = len(set(params.keys()) & set( ['label','folder'])) > 0 - if q : - folder = params['label'] if 'label' in params else params['folder'] - r = os.path.exists(folder) - - return p and q and r - def archive(self,item): - """ - This function will archive all files in a given folder - @pre : isValid - """ - - folder = item['label'] - name = folder.split(os.sep) - name = name[len(name)-1] - date = str(datetime.now()).replace(' ','@')#''.join([str(i) for i in item['date'].values()]) - - # signature='-'.join([name,date,str(item['stats']['file_count']),'files']) - signature='-'.join([name,date]) - tarball=os.sep.join([folder,'..',signature]) - shutil.make_archive(tarball,'tar',folder) - - #self.clean(item) - # - # @TODO: The archive can be uploaded to the cloud or else where - # @param id cloud service idenfier {dropbox,box,google-drive,one-drive} - # @param key authorization key for the given service - # - pass - return tarball+".tar" - def backup(self,tarball): - """ - This function will initiate backup to the cloud given - """ - if os.path.exists(tarball) : - - key = self.params['key'] - - sid = self.params['user']['sid'] - if sid == 'dropbox' : - cloud = Dropbox() - elif sid == 'google-drive' : - cloud = Google() - cloud.init(key) - file = open(tarball) - out = cloud.upload('backup','application/octet-stream',file) - file.close() - print out - pass - else: - pass - print tarball - print self.params['user']['sid'] - print self.params['key'] - # - # let's upload to the cloud - pass - def clean(self,item): - """ - This function consists in deleting files from a given folder - """ - rpath = item['label'] - files = os.listdir(item['label']) - for name in list(files) : - path = os.sep.join([item['label'],name]) - if os.path.isdir(path) : - shutil.rmtree(path) - else: - os.remove(path) - # - # - - def get_size(self,value): - """ - converts size values into MB and returns the value without units - """ - units = {'MB':1000,'GB':1000000,'TB':1000000000} # converting to kb - key = set(units.keys()) & set(re.split('(\d+)',value.replace(' ','').upper())) - - - if len(key) == 0: - return -1 - key = key.pop() - return float(value.upper().replace('MB','').strip()) * units[key] - - def can_clean(self,item): - """ - This function returns whether the following : - p : folder exists - q : has_reached threashold - - @TODO: Add a user defined configuration element to make this happen - """ - - p = os.path.exists(item['label']) and item['label'] in self.lfolders - q = item['stats']['size']['mean'] >= self.threshold and self.threshold > 0 - return p and q - def run(self): - tarball = None - if 'archive' in self.action : - tarball = self.archive(self.params) - - if 'backup' in self.action and tarball: - self.backup(tarball) - if 'delete' in self.action and self.can_clean(): - self.clean() - def analyze(self,logs): - r = {'clean':self.clean,'archive':self.archive} - self.lfolders = [ folder['label'] for folder in logs] - #for item in logs : - - # if self.can_clean(item) : - # self.archive(item) - # self.clean(item) diff --git a/src/utils/agents/learner.py b/src/utils/agents/learner.py deleted file mode 100755 index 8cbf40a..0000000 --- a/src/utils/agents/learner.py +++ /dev/null @@ -1,132 +0,0 @@ -""" - This file encapsulates a class that is intended to perform learning -""" -from __future__ import division -import numpy as np -from sklearn import linear_model -from threading import Thread,RLock -from utils.transport import * -from utils.ml import AnomalyDetection,ML -from utils.params import PARAMS -import time -class BaseLearner(Thread): - def __init__(self,lock) : - Thread.__init__(self) - path = PARAMS['path'] - self.name = self.__class__.__name__.lower() - self.rclass= None - self.wclass= None - self.rw_args=None - if os.path.exists(path) : - f = open(path) - self.config = json.loads(f.read()) - f.close() - self.rclass = self.config['store']['class']['read'] - self.wclass = self.config['store']['class']['write'] - self.rw_args = self.config['store']['args'] - - else: - self.config = None - self.lock = lock - self.factory = DataSourceFactory() - self.quit = False - """ - This function is designed to stop processing gracefully - - """ - def stop(self): - self.quit = True - - -""" - This class is intended to apply anomaly detection to various areas of learning - The areas of learning that will be skipped are : - ['_id','_rev','learn'] ... - - @TODO: - - Find a way to perform dimensionality reduction if need be -""" -class Anomalies(BaseLearner) : - def __init__(self,lock): - BaseLearner.__init__(self,lock) - if self.config : - # - # Initializing data store & factory class - # - self.id = self.config['id'] - self.apps = self.config['procs'] if 'procs' in self.config else [] - self.rclass = self.config['store']['class']['read'] - self.wclass = self.config['store']['class']['write'] - self.rw_args = self.config['store']['args'] - # self.factory = DataSourceFactory() - self.quit = False - # self.lock = lock - def format(self,stream): - pass - def stop(self): - self.quit = True - - def run(self): - DELAY = self.config['delay'] * 60 - reader = self.factory.instance(type=self.rclass,args=self.rw_args) - data = reader.read() - key = 'apps@'+self.id - if key in data: - rdata = data[key] - features = ['memory_usage','cpu_usage'] - yo = {"1":["running"],"name":"status"} - while self.quit == False : - print ' *** ',self.name, ' ' , str(datetime.today()) - for app in self.apps: - print '\t',app,str(datetime.today()),' ** ',app - logs = ML.Filter('label',app,rdata) - - if logs : - handler = AnomalyDetection() - value = handler.learn(logs,'label',app,features,yo) - if value is not None: - value = dict(value,**{"features":features}) - value = dict({"id":self.id},**value) - #r[id][app] = value - self.lock.acquire() - writer = self.factory.instance(type=self.wclass,args=self.rw_args) - writer.write(label='learn',row=value) - self.lock.release() - # - if 'MONITOR_CONFIG_PATH' in os.environ : - break - time.sleep(DELAY) - print ' *** Exiting ',self.name.replace('a','A') - - - -""" - Let's estimate how many files we will have for a given date - y = ax + b with y: number files, x: date, y: Number of files -""" -class Regression(BaseLearner): - def __init__(self,lock): - BaseLearner.__init__(self,lock) - self.folders = self.config['folders'] - self.id = self.config['id'] - def run(self): - DELAY = self.config['delay'] * 60 - reader = self.factory.instance(type=self.rclass,args=self.rw_args) - data = reader.read() - if 'folders' in data : - data = ML.Filter('id',self.id,data['folders']) - xo = ML.Extract(['date'],data) - yo = ML.Extract(['count'],data) - - - pass - # print np.var(xo,yo) - - - - - -if __name__ == '__main__' : - lock = RLock() - thread = Anomalies(lock) - thread.start() \ No newline at end of file diff --git a/src/utils/agents/manager.py b/src/utils/agents/manager.py deleted file mode 100755 index f7cc5bd..0000000 --- a/src/utils/agents/manager.py +++ /dev/null @@ -1,220 +0,0 @@ -""" - Features : - - data collection - - detection, reboot (service) - - respond to commands (service) -""" -#from threading import Thread, RLock -from __future__ import division -import os -import json -import time -from datetime import datetime -from utils.transport import * -import monitor -import requests -from threading import Thread -class Manager() : - def version(self): - return 1.1 - """ - - delay : - limit : - scope : apps,folders,learner,sandbox - """ - def __init__(self): - self.factory = DataSourceFactory() - def set(self,name,value): - setattr(name,value) - def init(self,**args) : - self.id = args['node'] - self.agents = args['agents'] - self.config = dict(args['config']) - self.key = args['key'] - self.actors = args['actors'] - self.plan = self.config['plan'] - - self.DELAY = int(self.plan['metadata']['delay']) - self.host = args['host'] - self.update() #-- Initializing status information - _args={"host":"dev.the-phi.com","qid":self.id,"uid":self.key} - # - # Connecting to the messaging service - - self.qlistener = self.factory.instance(type="QueueListener",args=_args) - self.qlistener.callback = self.callback - self.qlistener.init(self.id) - - # self.qlistener.read() - thread = (Thread(target=self.qlistener.read)) - thread.start() - - def update(self) : - """ - This method inspect the plans for the current account and makes sure it can/should proceed - The user must be subscribed and to the service otherwise this is not going to work - """ - - url="http://:host/monitor/init/collector".replace(':host',self.host) - - r = requests.post(url,headers={"key":self.key,"id":self.id}) - r = json.loads(r.text) - # meta = [item['metadata'] for item in plans if item['status']=='active' ] - self.plan = r['plan'] - meta = self.plan['metadata'] - - if meta : - self.DELAY = 60* int(meta['delay']) - self.LIMIT = int(meta['limit']) - #dbname = [item['name'] for item in plans if int(item['metadata']['limit']) == self.LIMIT][0] - #self.config['store']['args']['dbname'] = dbname - - else: - self.DELAY = -1 - self.LIMIT = -1 - - #self.filter(meta) - # - # We are removing all that is not necessary i.e making sure the features matches the plan user has paid for - # - self.agents = self.filter('agents',meta,self.agents) - self.actors = self.filter('actors',meta,self.actors) - self.setup(meta) - - # def filter_collectors(self,meta) : - # """ - # remove collectors that are not specified by the plan - # Note that the agents (collectors) have already been initialized ? - # """ - # values = meta['agents'].replace(' ','').split(',') - # self.agents = [agent for agent in self.agents if agent.getName() in values] - - - # def filter_actors(self,meta): - # """ - # removes actors that are NOT specified by the subscription plan - # Note that the actor have already been instatiated and need initialization - # """ - # values = meta['actors'].replace(' ','').split('.') - # self.actors = [actor for actor in self.actors if actor.getName() in values] - - def filter(self,id,meta,objects): - """ - This function filters the agents/actors given what is available in the user's plan - - """ - values = meta[id].replace(' ','').split(',') - return [item for item in objects if item.getName() in values] - - def setup(self,meta) : - # conf = {"folders":None,"apps":None} - # read_class = self.config['store']['class']['read'] - # read_args = self.config['store']['args'] - - - # args = None - # couchdb = self.factory.instance(type=read_class,args=read_args) - # args = couchdb.view('config/apps',key=self.key) - # if len(args.keys()) > 0 : - # self.apply_setup('apps',args) - # args = couchdb.view('config/folders',key=self.key) - # if 'folder_size' not in meta : - # # args['threshold'] = meta['folder_size'] - # self.apply_setup('folders',meta) - #self.apply_setup('folders',meta) - #@TODO: For now app actors don't need any particular initialization - pass - - def apply_setup(self,name,args) : - for actor in self.actors : - if args is not None and actor.getName() == name and len(args.keys()) > 0: - actor.init(args) - - def isvalid(self): - self.update() - return self.DELAY > -1 and self.LIMIT > -1 - def post(self,row) : - """ - This function is designed to take appropriate action if a particular incident has been detected - @param label - @param row data pulled extracted - """ - message = {} - message['action'] = 'reboot' - message['node'] = label - - def callback(self,channel,method,header,stream): - """ - This function enables the manager to be able to receive messages and delegate them to the appropriate actor - @channel - """ - message = json.loads(stream) - # - # we should inspect the message and insure it has reached the appropriate recepient - # - if 'node' in message and message['node'] == self.id : - action = message['action'] - params = message['params'] - # params['plan'] = self.plan['metadata'] - self.delegate(action,params) - - def delegate(self,action,params): - for actor in self.actors : - - if actor.isValid(action=action,params=params) : - - actor.init(action,params) - actor.run() - - break - pass - def run(self): - #DELAY=35*60 #- 35 Minutes - #LIMIT=1000 - COUNT = 0 - COUNT_STOP = int(24*60/ self.DELAY) - write_class = self.config['store']['class']['write'] - read_args = self.config['store']['args'] - - while True : - COUNT += 1 - if COUNT > COUNT_STOP : - if self.isvalid() : - COUNT = 0 - else: - break - for agent in self.agents : - data = agent.composite() - label = agent.getName() - node = '@'.join([label,self.id]) - row = {} - - if label == 'folders': - row = [ dict({"id":self.id}, **_row) for _row in data] - else: - #label = id - row = data - # - # @TODO: - # This data should be marked if it has been flagged for reboot - # - if type(row)==list and len(row) == 0 : - - continue - # - # - index = self.agents.index(agent) - - if len(self.actors) > index and self.actors[index].getName() == agent.getName() : - actor = self.actors[index] - actor.analyze(row) - - # self.lock.acquire() - store = self.factory.instance(type=write_class,args=read_args) - store.flush(size=self.LIMIT) - store.write(label=node,row=[row]) - # self.lock.release() - print (["Falling asleep ",self.DELAY/60]) - time.sleep(self.DELAY) - diff --git a/src/utils/params.py b/src/utils/params.py deleted file mode 100755 index ea98de4..0000000 --- a/src/utils/params.py +++ /dev/null @@ -1,32 +0,0 @@ -import sys -PARAMS = {'context':''} -if len(sys.argv) > 1: - - N = len(sys.argv) - for i in range(1,N): - value = None - if sys.argv[i].startswith('--'): - key = sys.argv[i].replace('-','') - PARAMS[key] = 1 - if i + 1 < N: - value = sys.argv[i + 1] = sys.argv[i+1].strip() - if key and value: - PARAMS[key] = value - if key == 'context': - PARAMS[key] = ('/'+value).replace('//','/') - - i += 2 - -import logging -import json -from datetime import datetime -class Logger : - @staticmethod - def init(filename): - name = "-".join([filename,datetime.now().strftime('%d-%m-%Y')])+".log" - logging.basicConfig(filename=name,level=logging.INFO,format="%(message)s") - @staticmethod - def log(**args) : - args['date'] = datetime.now().strftime('%d-%m-%Y %H:%M:%S') - logging.info(json.dumps(args)) - diff --git a/src/utils/services.py b/src/utils/services.py deleted file mode 100755 index b58c03b..0000000 --- a/src/utils/services.py +++ /dev/null @@ -1,598 +0,0 @@ -""" - CloudView Engine 2.0 - The Phi Technology LLC - Steve L. Nyemba - - This is a basic cloud view engine that is designed to be integrated into any service and intended to work for anyone provided they have signed up with the cloud service provider - The intent is to make the engine a general purpose engine that can be either deployed as a service (3-launchpad) or integrated as data feed for a third party utility - -""" -from __future__ import division -from threading import Thread -import json -import requests -from xmljson import yahoo as bf -from xml.etree.ElementTree import Element, tostring, fromstring, ElementTree as ET -import xmltodict -from email.mime.base import MIMEBase -from email.mime.multipart import MIMEMultipart -from StringIO import StringIO -class Cloud: - BYTES_TO_GB = 1000000000 - Config = None - STREAMING_URI = None - @staticmethod - def instance(id,**args): - id = id.strip() - if id == 'skydrive' : - id = 'one-drive' - - handler = None - path = args['path'] if 'path' in args else None - if not Cloud.Config and path: - f = open(path) - Cloud.Config = json.loads(f.read()) - Cloud.STREAMING_URI = str(Cloud.Config['api']) - Cloud.Config = Cloud.Config['cloud'] - f.close() - if path and id in Cloud.Config : - context = Cloud.Config[id] - className = context['class'] - config = json.dumps(context['config']) - handler = eval( "".join([className,"(",config,")"])) - - # - # In case a stream was passed in ... - # - if 'stream' in args: - stream = args['stream'] - context = Cloud.Config[id] - className = context['class'] - - handler = eval("".join([className,"(None)"])) - handler.from_json(stream) - # - # Once the handler is rovided we must retrieve the service given the key - # The key provides information about what files to extract as well as the preconditions - # @TODO: - # - Keys are maintained within the stripe account/couchdb - # - - return handler - - def __init__(self): - self.access_token = None - self.refresh_token= None - self.files = [] - self.client_id = None - self.secret = None - self.mfiles = {} - self.folders={} - - def to_json(self): - object = {} - keys = vars(self) - for key in keys: - value = getattr(self,key) - object[key] = value - return json.dumps(object) - def from_json(self,stream): - ref = json.loads(stream) ; - for key in ref.keys() : - value = ref[key] - setattr(self,key,value) - # self.access_token = ref['access_token'] - # self.refesh_token = ref['refresh_token'] - # self.files = ref['files'] - """ - This function matches a name with a list of possible features/extensions - - """ - def match(self,filename,filters): - if isinstance(filters,str): - filters = [filters] - return len(set(filename.lower().split('.')) & set(filters)) > 0 - - def getName(self): - return self.__class__.__name__.lower() - def get_authURL(self): - - config = Cloud.Config[self.getName()]['config'] - url = config['authURL'] - - if '?' in url == False: - url += '?' - keys=['client_id','redirect_uri'] - p = [] - for id in keys: - value = config[id] - p.append(id+'='+value) - url = url +"&"+ "&".join(p) - - return url -Cloud.Config = {} - -class Google(Cloud): - def __init__(self,conf=None): - Cloud.__init__(self) - def getName(self): - return 'google-drive' - def init(self,token): - self.refresh_token = token - self._refresh() - - def _refresh(self,code=None): - url = "https://accounts.google.com/o/oauth2/token" - headers = {"Content-Type":"application/x-www-form-urlencoded"} - data = {"client_id":self.client_id,"client_secret":self.secret} - if code : - grant_type = 'authorization_code' - data['code'] = code - else: - data['refresh_token'] = self.refresh_token - grant_type = 'refresh_token' - - data['grant_type'] = grant_type - data['redirect_uri'] = self.redirect_uri - - resp = requests.post(url,headers=headers,data=data) - r = json.loads(resp.text) - if 'access_token' in r: - self.access_token = r['access_token'] - self.refresh_token = r['refresh_token'] if 'refresh_token' in r else r['access_token'] - self.id_token = r['id_token'] - - - def create_file(self,**args): - url = "https://www.googleapis.com/upload/drive/v2/files" ; - headers = {"Authorization":"Bearer "+self.access_token} - headers['Content-Type'] = args['mimetype'] - params = args['params'] - if 'data' not in args : - r = requests.post(url,params = params,headers=headers) - else: - data = args['data'] - r = requests.post(url,data=data,params = params,headers=headers) - return r.json() - def update_metadata(self,id,metadata) : - url = "https://www.googleapis.com/drive/v2/files" - headers = {"Authorization":"Bearer "+self.access_token} - headers['Content-Type'] = 'application/json; charset=UTF-8' - - if id is not None : - url += ("/"+id) - r = requests.put(url,json=metadata,headers=headers) - else: - # url += ("/?key="+self.secret) - r = requests.post(url,data=json.dumps(metadata),headers=headers) - - return r.json() - - def upload(self,folder,mimetype,file): - """ - This function will upload a file to a given folder and will provide - If the folder doesn't exist it will be created otherwise the references will be fetched - This allows us to avoid having to create several folders with the same name - """ - r = self.get_files(folder) - - if len(r) == 0 : - info = {"name":folder, "mimeType":"application/vnd.google-apps.folder"} - r = self.update_metadata(None,{"name":folder,"title":folder, "mimeType":"application/vnd.google-apps.folder"}) - else: - r = r[0] - parent = r - parent = {"kind":"drive#file","name":folder,"id":parent['id'],"mimeType":"application/vnd.google-apps.folder"} - - - r = self.create_file(data=file.read(),mimetype=mimetype,params={"uploadType":"media"}) - info = {"title":file.filename,"description":"Create by Cloud View"} - info['parents'] = [parent] - - r = self.update_metadata(r['id'],metadata=info) - return r - - -""" - This class is designed to allow users to interact with one-drive -""" -class OneDrive(Cloud): - def __init__(self,conf): - Cloud.__init__(self) - def getName(self): - return 'one-drive' - def init(self,token): - self.refresh_token = token - self._refresh() - - def _refresh(self,code=None): - url = "https://login.live.com/oauth20_token.srf" - #url="https://login.microsoftonline.com/common/oauth2/v2.0/token" - - headers = {"Content-Type":"application/x-www-form-urlencoded"} - form = {"client_id":self.client_id,"client_secret":self.secret} - if code: - grant_type = 'authorization_code' - form['code'] = str(code) - else: - grant_type = 'refresh_token' - form['refresh_token'] = self.refresh_token - form['grant_type'] = grant_type - if self.redirect_uri: - form['redirect_uri'] = self.redirect_uri - r = requests.post(url,headers=headers,data=form) - r = json.loads(r.text) - if 'access_token' in r: - self.access_token = r['access_token'] - self.refresh_token = r['refresh_token'] - - def upload(self,folder,mimetype,file): - """ - @param folder parent.id - @param name name of the file with extension - @param stream file content - - """ - path = folder+"%2f"+file.filename - url = "https://apis.live.net/v5.0/me/skydrive/files/:name?access_token=:token".replace(":name",path).replace(":token",self.access_token) ; - - header = {"Authorization": "Bearer "+self.access_token,"Content-Type":mimetype} - header['Content-Type']= mimetype - r = requests.put(url,header=header,files=file) - r = r.json() - return r - -""" - This class uses dropbox version 2 API -""" -class Dropbox(Cloud): - def __init__(self): - Cloud.__init__(self) - def init(self,access_token): - self.access_token = access_token - def upload(self,folder,mimetype,file): - """ - @param folder parent.id - @param name name of the file with extension - @param stream file content - - @TODO: This upload will only limit itself to 150 MB, it is possible to increase this size - """ - url = "https://content.dropboxapi.com/2/files/upload" - folder = folder if folder is not None else "" - path = "/"+folder+"/"+file.name.split('/')[-1] - path = path.replace("//","/") - header = {"Authorization":"Bearer "+self.access_token,"Content-Type":mimetype} - #header['autorename']= "false" - header['mode'] = "add" - #header['mute'] = "false" - header['Dropbox-API-Arg'] = json.dumps({"path":path}) - r = requests.post(url,headers=header,data=file.read()) - print r.text - r = r.json() - return r - - -""" - This class implements basic interactions with box (cloud service providers) - Available functionalities are: authentication, file access,share and stream/download -""" -class Box(Cloud) : - def __init__(self,conf): - Cloud.__init__(self); - if conf is not None: - self.client_id = conf['client_id'] - self.secret = conf['secret'] - self.redirect_uri = conf['redirect_uri'] if 'redirect_uri' in conf else None - def init(self,token): - self.refresh_token = token - def set(self,code) : - self._access(code) - return 1 if self.access_token else 0 - - def _access(self,code): - body = {"client_id":self.client_id,"client_secret":self.secret,"grant_type":"authorization_code","code":code,"redirect_uri":self.redirect_uri} - headers = {"Content-Type":"application/x-www-form-urlencoded"} - url = "https://app.box.com/api/oauth2/token" - r = requests.post(url,headers=headers,data=body) - r = json.loads(r.text) - if 'error' not in r: - self.access_token = r['access_token'] - self.refresh_token= r['refresh_token'] - def _refresh(self,authToken) : - body = {"client_id":self.client_id,"client_secret":self.secret,"grant_type":"refresh_token"} - url = "https://app.box.com/api/oauth2/token"; - headers = {"Content-Type":"application/x-www-form-urlencoded"} - r = requests.post(url,headers=headers,data=body) - r = json.loads(r.text) - if 'error' not in r : - self.access_token = r['access_token'] - def get_user(self): - url = "https://api.box.com/2.0/users/me" - headers = {"Authorization":"Bearer "+self.access_token} - r = requests.get(url,headers=headers) - r = json.loads(r.text) - if 'login' in r : - #BYTES_TO_GB = 1000000000 - user = {"uii":r['name'],"uid":r['login']} - usage = {"size":r['space_amount']/Cloud.BYTES_TO_GB,"used":r['space_used']/Cloud.BYTES_TO_GB,"units":"GB"} - user['usage'] = usage - return user - else: - return None - - def format(self,item) : - file = {"name":item['name'],"origin":"box","id":item['id'],"url":""} - meta = {"last_modified":item['content_modified_at']} - return file - def get_files(self,ext,url=None): - ext = " ".join(ext) - url = "https://api.box.com/2.0/search?query=:filter&type=file" - url = url.replace(":filter",ext) - headers = {"Authorization":"Bearer "+self.access_token} - - r = requests.get(url,headers=headers) ; - r = json.loads(r.text) - if 'entries' in r: - #self.files = [ self.format(file) for file in r['entries'] if file['type'] == 'file' and 'id' in file] - for item in r : - if item['type'] == 'file' and 'id' in item : - self.files.append( self.format(item)) - else: - # - # We are dealing with a folder, this is necessary uploads - # - self.folder[item['name']] = item["id"] - - return self.files - def stream(self,url): - headers = {"Authorization":"Bearer "+self.access_token} - r = requests.get(url,headers=headers,stream=True) - yield r.content - def share(self,id): - url = "https://api.box.com/2.0/files/:id".replace(":id",id); - headers = {"Authorization":"Bearer "+self.access_token,"Content-Type":"application/json"} - body = {"shared_link":{"access":"open","permissions":{"can_download":True}}} - r = requests.put(url,headers=headers,data=json.dumps(body)) - r = json.loads(r.text) - if 'shared_link' in r: - return r['shared_link']['download_url'] - - return None - def upload(self,folder,mimetype,file): - """ - @param folder parent.id - @param name name of the file with extension - @param stream file content - """ - if folder not in self.folders : - # - # Let us create the folder now - # - url = "https://api.box.com/2.0/folders" - header = {"Authorization":"Bearer "+self.access_token} - pid = self.folders["/"] if "/" in self.folders else self.folders[""] - data = {"parent":{"id":str(pid)}} - - r = requests.post(url,header=header,data=data) - r = r.json() - pid = r["id"] - else: - pid = self.folders[folder] - url = "https://upload.box.com/api/2.0/files/content" - header = {"Authorization Bearer ":self.access_token,"Content-Type":mimetype} - r = requests.post(url,header=header,file=file) - r = r.json() - return r - - -class SugarSync(Cloud): - def __init__(self): - Cloud.__init__(self) - - def __init__(self,conf): - Cloud.__init__(self); - if conf is not None: - self.client_id = conf['app_id'] - self.private_key = conf['private_key'] - self.access_key = conf['access_key'] - #self.access_token = None - #self.refresh_token= None - # self.redirect_uri = conf['redirect_uri'] if 'redirect_uri' in conf else None - #self.files = [] - def init(self,token): - self.refresh_token = token - self._refresh() - def login(self,email,password): - xml = ':username:password:app_id:accesskey:privatekey' - xml = xml.replace(":app_id",self.app_id).replace(":privatekey",self.private_key).replace(":accesskey",self.access_key).replace(":username",email).replace(":password",password) - headers = {"Content-Type":"application/xml","User-Agent":"The Phi Technology"} - r = requests.post(url,headers=headers,data=xml) - self.refresh_token = r.headers['Location'] - - - def _refresh(self): - xml = ':accesskey:privatekey:authtoken' - xml = xml.replace(":accesskey",self.access_key).replace(":privatekey",self.private_key).replace(":authtoken",self.refresh_token) - - headers = {"Content-Type":"application/xml","User-Agent":"The Phi Technology LLC"} - url = "https://api.sugarsync.com/authorization" - r = requests.post(url,data=xml,headers=headers) - - self.access_token = r.headers['Location'] - def format(self,item): - file = {} - file['name'] = item['displayName'] - file['url'] = item['fileData'] - file['id'] = item['ref'] - meta = {} - meta['last_modified'] = item['lastModified'] - file['meta'] = meta - return file - - def get_files(self,ext,url=None) : - if url is None: - url = "https://api.sugarsync.com/folder/:sc:3989243:2/contents"; - headers = {"Authorization":self.access_token,"User-Agent":"The Phi Technology LLC","Content-Type":"application/xml;charset=utf-8"} - r = requests.get(url,headers=headers) - stream = r.text #.encode('utf-8') - r = xmltodict.parse(r.text) - - if 'collectionContents' in r: - - r = r['collectionContents'] - # - # Extracting files in the current folder then we will see if there are any subfolders - # The parser has weird behaviors that leave inconsistent objects (field names) - # This means we have to filter it out by testing the item being processed - if 'file' in r: - if isinstance(r['file'],dict): - self.files += [ self.format(r['file']) ] - else: - - #self.files += [self.format(item) for item in r['file'] if isinstance(item,(str, unicode)) == False and item['displayName'].endswith(ext)] - self.files += [self.format(item) for item in r['file'] if isinstance(item,(str, unicode)) == False and self.match(item['displayName'],ext)] - - if 'collection' in r: - if isinstance(r['collection'],dict) : - # - # For some unusual reason the parser handles single instances as objects instead of collection - # @NOTE: This is a behavior that happens when a single item is in the collection - # - self.get_files(ext,r['collection']['contents']) - for item in r['collection'] : - if 'contents' in item: - if isinstance(item,(str, unicode)) == False: - self.files += self.get_files(ext,item['contents']) - #[ self.get_files(ext,item['contents']) for item in r['collection'] if item['type'] == 'folder'] - return self.files - - def get_user(self): - url = "https://api.sugarsync.com/user" - headers = {"Authorization":self.access_token,"User-Agent":"The Phi Technology LLC","Content-Type":"application/xml;charset=utf-8"} - r = requests.get(url,headers=headers) - - r = xmltodict.parse(r.text) - r = r['user'] - - - if 'username' in r and 'quota' in r: - user = {"uid":r['username'],"uii":r['nickname']} - size = long(r['quota']['limit']) - used = long(r['quota']['usage']) - usage = {"size":size/Cloud.BYTES_TO_GB,"used":used/Cloud.BYTES_TO_GB,"units":"GB"} - user['usage'] = usage - return user - else: - return None - def stream(self,url): - headers = {"Authorization":self.access_token} - r = requests.get(url,headers=headers,stream=True) - yield r.content - """ - This function will create a public link and share it to designated parties - """ - def share(self,id): - url = "https://api.sugarsync.com/file/:id".replace(":id",id); - xml = ''; - headers = {"Content-Type":"application/xml","Authorization":self.access_token,"User-Agent":"The Phi Technology LLC"} - r = requests.put(url,header=header,data=xml) - r = xmltodict.parse(r.text) - if 'file' in r: - return r['file']['publicLink']['content']+"?directDownload=true" - else: - return None - - def upload(self,folder,mimetype,file): - - name = foler+"/"+file.filename - xml = ':name:type' - xml = xml.replace(':name',name).replace(':type',mimetype) - header = {"content-type":"application/xml","User-Agent":"The Phi Technology LLC"} - header['Authorization'] = self.access_token - - r = requests.post(url,headers=header,files=file,data=xml) - pass - -class iTunes(Cloud): - def __init__(self): - Cloud.__init__(self) - self.url_topsongs = "http://ax.itunes.apple.com/WebObjects/MZStoreServices.woa/ws/RSS/topsongs/limit=:limit/explicit=false/json" - self.url_search = "http://itunes.apple.com/search?term=:keyword&limit=:limit&media=music" - def parse_search(self,obj): - - files = [] - try: - logs = obj['results'] - - for item in logs : - - file = {} - file['id'] = item['trackId'] - file['name'] = item['trackName'] - file['id3'] = {} - file['id3']['track'] = item['trackName'] - file['id3']['title'] = item['trackName'] - file['id3']['artist']= item['artistName'] - file['id3']['album'] = item['collectionName'] - file['id3']['genre'] = item['primaryGenreName'] - file['id3']['poster']= item['artworkUrl100'] - file['url'] = item['previewUrl'] - - files.append(file) - except Exception,e: - print e - return files - def parse_chart(self,obj): - """ - This function will parse the tonsongs returned by the itunes API - """ - files = [] - - try: - logs = obj['feed']['entry'] - if isinstance(logs,dict) : - logs = [logs] - - for item in logs : - - file = {'name':item['im:name']['label'],'id3':{}} - file['id'] = item['id']['attributes']['im:id'] - file['id3'] = {} - file['id3']['artist'] = item['im:artist']['label'] - file['id3']['track'] = item['title']['label'] - file['id3']['title'] = item['title']['label'] - file['id3']['album'] = item['im:collection']['im:name']['label'] - file['id3']['genre'] = item['category']['attributes']['term'] - index = len(item['im:image'])-1 - file['id3']['poster'] = item['im:image'][index]['label'] - url = [link['attributes']['href'] for link in item['link'] if 'im:assetType' in link['attributes'] and link['attributes']['im:assetType']=='preview'] - - if len(url) > 0: - url = url[0] - file['url'] = url #item['link'][1]['attributes']['href'] //'im:assetType' == 'preview' and 'im:duration' is in the sub-item - files.append(file) - - else: - continue - except Exception,e: - print e - # - # @TODO: Log the error somewhere to make it useful - - return files - - def parse(self,obj) : - if 'feed' in obj and 'entry' in obj['feed']: - return self.parse_chart(obj) - elif 'results' in obj : - return self.parse_search(obj) - else: - return [] - def get_files(self,keyword=None,limit="1") : - url = self.url_search if keyword is not None else self.url_topsongs - keyword = "" if keyword is None else keyword - # limit = "50" if keyword == "" else "1" - - url = url.replace(":keyword",keyword.replace(' ','+')).replace(':limit',limit) - r = requests.get(url) - r= r.json() - return self.parse(r) diff --git a/src/utils/transport.py b/src/utils/transport.py deleted file mode 100755 index 0c28aaa..0000000 --- a/src/utils/transport.py +++ /dev/null @@ -1,709 +0,0 @@ -""" - This file implements data transport stuctures in order to allow data to be moved to and from anywhere - We can thus read data from disk and write to the cloud,queue, or couchdb or SQL -""" -from flask import request, session -import os -import pika -import json -import numpy as np -from couchdbkit import Server -import re -from csv import reader -from datetime import datetime -""" - @TODO: Write a process by which the class automatically handles reading and creating a preliminary sample and discovers the meta data -""" -class Reader: - def __init__(self): - self.nrows = 0 - self.xchar = None - - def row_count(self): - content = self.read() - return np.sum([1 for row in content]) - """ - This function determines the most common delimiter from a subset of possible delimiters. It uses a statistical approach to guage the distribution of columns for a given delimiter - """ - def delimiter(self,sample): - - m = {',':[],'\t':[],'|':[],'\x3A':[]} - delim = m.keys() - for row in sample: - for xchar in delim: - if row.split(xchar) > 1: - m[xchar].append(len(row.split(xchar))) - else: - m[xchar].append(0) - - - - # - # The delimiter with the smallest variance, provided the mean is greater than 1 - # This would be troublesome if there many broken records sampled - # - m = {id: np.var(m[id]) for id in m.keys() if m[id] != [] and int(np.mean(m[id]))>1} - index = m.values().index( min(m.values())) - xchar = m.keys()[index] - - return xchar - """ - This function determines the number of columns of a given sample - @pre self.xchar is not None - """ - def col_count(self,sample): - - m = {} - i = 0 - - for row in sample: - row = self.format(row) - id = str(len(row)) - #id = str(len(row.split(self.xchar))) - - if id not in m: - m[id] = 0 - m[id] = m[id] + 1 - - index = m.values().index( max(m.values()) ) - ncols = int(m.keys()[index]) - - - return ncols; - """ - This function will clean records of a given row by removing non-ascii characters - @pre self.xchar is not None - """ - def format (self,row): - - if isinstance(row,list) == False: - # - # We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary) - cols = self.split(row) - #cols = row.split(self.xchar) - else: - cols = row ; - return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols] - - #if isinstance(row,list) == False: - # return (self.xchar.join(r)).format('utf-8') - #else: - # return r - """ - This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes. - @pre : self.xchar is not None - """ - def split (self,row): - - pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"]) - return re.findall(pattern,row.replace('\n','')) - -class Writer: - - def format(self,row,xchar): - if xchar is not None and isinstance(row,list): - return xchar.join(row)+'\n' - elif xchar is None and isinstance(row,dict): - row = json.dumps(row) - return row - """ - It is important to be able to archive data so as to insure that growth is controlled - Nothing in nature grows indefinitely neither should data being handled. - """ - def archive(self): - pass - def flush(self): - pass - -""" - This class is designed to read data from an Http request file handler provided to us by flask - The file will be heald in memory and processed accordingly - NOTE: This is inefficient and can crash a micro-instance (becareful) -""" -class HttpRequestReader(Reader): - def __init__(self,**params): - self.file_length = 0 - try: - - #self.file = params['file'] - #self.file.seek(0, os.SEEK_END) - #self.file_length = self.file.tell() - - #print 'size of file ',self.file_length - self.content = params['file'].readlines() - self.file_length = len(self.content) - except Exception, e: - print "Error ... ",e - pass - - def isready(self): - return self.file_length > 0 - def read(self,size =-1): - i = 1 - for row in self.content: - i += 1 - if size == i: - break - yield row - -""" - This class is designed to write data to a session/cookie -""" -class HttpSessionWriter(Writer): - """ - @param key required session key - """ - def __init__(self,**params): - self.session = params['queue'] - self.session['sql'] = [] - self.session['csv'] = [] - self.tablename = re.sub('..+$','',params['filename']) - self.session['uid'] = params['uid'] - #self.xchar = params['xchar'] - - - def format_sql(self,row): - values = "','".join([col.replace('"','').replace("'",'') for col in row]) - return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename) - def isready(self): - return True - def write(self,**params): - label = params['label'] - row = params ['row'] - - if label == 'usable': - self.session['csv'].append(self.format(row,',')) - self.session['sql'].append(self.format_sql(row)) - -""" - This class is designed to read data from disk (location on hard drive) - @pre : isready() == True -""" -class DiskReader(Reader) : - """ - @param path absolute path of the file to be read - """ - def __init__(self,**params): - Reader.__init__(self) - self.path = params['path'] ; - - def isready(self): - return os.path.exists(self.path) - """ - This function reads the rows from a designated location on disk - @param size number of rows to be read, -1 suggests all rows - """ - def read(self,size=-1): - f = open(self.path,'rU') - i = 1 - for row in f: - - i += 1 - if size == i: - break - yield row - f.close() -""" - This function writes output to disk in a designated location -""" -class DiskWriter(Writer): - def __init__(self,**params): - if 'path' in params: - self.path = params['path'] - else: - self.path = None - if 'name' in params: - self.name = params['name']; - else: - self.name = None - if os.path.exists(self.path) == False: - os.mkdir(self.path) - """ - This function determines if the class is ready for execution or not - i.e it determines if the preconditions of met prior execution - """ - def isready(self): - - p = self.path is not None and os.path.exists(self.path) - q = self.name is not None - return p and q - """ - This function writes a record to a designated file - @param label - @param row row to be written - """ - def write(self,**params): - label = params['label'] - row = params['row'] - xchar = None - if 'xchar' is not None: - xchar = params['xchar'] - path = ''.join([self.path,os.sep,label]) - if os.path.exists(path) == False: - os.mkdir(path) ; - path = ''.join([path,os.sep,self.name]) - f = open(path,'a') - row = self.format(row,xchar); - f.write(row) - f.close() -""" - This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq) -""" -class MessageQueue: - def __init__(self,**params): - self.host= params['host'] - self.uid = params['uid'] - self.qid = params['qid'] - - def isready(self): - #self.init() - resp = self.connection is not None and self.connection.is_open - self.close() - return resp - def close(self): - if self.connection.is_closed == False : - self.channel.close() - self.connection.close() -""" - This class is designed to publish content to an AMQP (Rabbitmq) - The class will rely on pika to implement this functionality - - We will publish information to a given queue for a given exchange -""" - -class QueueWriter(MessageQueue,Writer): - def __init__(self,**params): - #self.host= params['host'] - #self.uid = params['uid'] - #self.qid = params['queue'] - MessageQueue.__init__(self,**params); - - - def init(self,label=None): - properties = pika.ConnectionParameters(host=self.host) - self.connection = pika.BlockingConnection(properties) - self.channel = self.connection.channel() - self.info = self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True) - if label is None: - self.qhandler = self.channel.queue_declare(queue=self.qid,durable=True) - else: - self.qhandler = self.channel.queue_declare(queue=label,durable=True) - - self.channel.queue_bind(exchange=self.uid,queue=self.qhandler.method.queue) - - - - """ - This function writes a stream of data to the a given queue - @param object object to be written (will be converted to JSON) - @TODO: make this less chatty - """ - def write(self,**params): - xchar = None - if 'xchar' in params: - xchar = params['xchar'] - object = self.format(params['row'],xchar) - - label = params['label'] - self.init(label) - _mode = 2 - if isinstance(object,str): - stream = object - _type = 'text/plain' - else: - stream = json.dumps(object) - if 'type' in params : - _type = params['type'] - else: - _type = 'application/json' - - self.channel.basic_publish( - exchange=self.uid, - routing_key=label, - body=stream, - properties=pika.BasicProperties(content_type=_type,delivery_mode=_mode) - ); - self.close() - - def flush(self,label): - self.init(label) - _mode = 1 #-- Non persistent - self.channel.queue_delete( queue=label); - self.close() - -""" - This class will read from a queue provided an exchange, queue and host - @TODO: Account for security and virtualhosts -""" -class QueueReader(MessageQueue,Reader): - """ - @param host host - @param uid exchange identifier - @param qid queue identifier - """ - def __init__(self,**params): - #self.host= params['host'] - #self.uid = params['uid'] - #self.qid = params['qid'] - MessageQueue.__init__(self,**params); - if 'durable' in params : - self.durable = True - else: - self.durable = False - self.size = -1 - self.data = {} - def init(self,qid): - - properties = pika.ConnectionParameters(host=self.host) - self.connection = pika.BlockingConnection(properties) - self.channel = self.connection.channel() - self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True) - - self.info = self.channel.queue_declare(queue=qid,durable=True) - - - - """ - This is the callback function designed to process the data stream from the queue - - """ - def callback(self,channel,method,header,stream): - - r = [] - if re.match("^\{|\[",stream) is not None: - r = json.loads(stream) - else: - - r = stream - - qid = self.info.method.queue - if qid not in self.data : - self.data[qid] = [] - - self.data[qid].append(r) - # - # We stop reading when the all the messages of the queue are staked - # - if self.size == len(self.data[qid]) or len(self.data[qid]) == self.info.method.message_count: - self.close() - - """ - This function will read, the first message from a queue - @TODO: - Implement channel.basic_get in order to retrieve a single message at a time - Have the number of messages retrieved be specified by size (parameter) - """ - def read(self,size=-1): - r = {} - self.size = size - # - # We enabled the reader to be able to read from several queues (sequentially for now) - # The qid parameter will be an array of queues the reader will be reading from - # - if isinstance(self.qid,basestring) : - self.qid = [self.qid] - for qid in self.qid: - self.init(qid) - # r[qid] = [] - - if self.info.method.message_count > 0: - - self.channel.basic_consume(self.callback,queue=qid,no_ack=False); - self.channel.start_consuming() - else: - - pass - #self.close() - # r[qid].append( self.data) - - return self.data -class QueueListener(QueueReader): - def init(self,qid): - properties = pika.ConnectionParameters(host=self.host) - self.connection = pika.BlockingConnection(properties) - self.channel = self.connection.channel() - self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True ) - - self.info = self.channel.queue_declare(passive=True,exclusive=True,queue=qid) - - self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid) - #self.callback = callback - def read(self): - - self.init(self.qid) - self.channel.basic_consume(self.callback,queue=self.qid,no_ack=True); - self.channel.start_consuming() - -""" - This class is designed to write output as sql insert statements - The class will inherit from DiskWriter with minor adjustments - @TODO: Include script to create the table if need be using the upper bound of a learner -""" -class SQLDiskWriter(DiskWriter): - def __init__(self,**args): - DiskWriter.__init__(self,**args) - self.tablename = re.sub('\..+$','',self.name).replace(' ','_') - """ - @param label - @param row - @param xchar - """ - def write(self,**args): - label = args['label'] - row = args['row'] - - if label == 'usable': - values = "','".join([col.replace('"','').replace("'",'') for col in row]) - row = "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename) - - args['row'] = row - DiskWriter.write(self,**args) -class Couchdb: - """ - @param uri host & port reference - @param uid user id involved - - @param dbname database name (target) - """ - def __init__(self,**args): - uri = args['uri'] - self.uid = args['uid'] - dbname = args['dbname'] - self.server = Server(uri=uri) - self.dbase = self.server.get_db(dbname) - if self.dbase.doc_exist(self.uid) == False: - self.dbase.save_doc({"_id":self.uid}) - """ - Insuring the preconditions are met for processing - """ - def isready(self): - p = self.server.info() != {} - if p == False or self.dbase.dbname not in self.server.all_dbs(): - return False - # - # At this point we are sure that the server is connected - # We are also sure that the database actually exists - # - q = self.dbase.doc_exist(self.uid) - if q == False: - return False - return True - def view(self,id,**args): - r =self.dbase.view(id,**args) - r = r.all() - return r[0]['value'] if len(r) > 0 else [] - -""" - This function will read an attachment from couchdb and return it to calling code. The attachment must have been placed before hand (otherwise oops) - @T: Account for security & access control -""" -class CouchdbReader(Couchdb,Reader): - """ - @param filename filename (attachment) - """ - def __init__(self,**args): - # - # setting the basic parameters for - Couchdb.__init__(self,**args) - if 'filename' in args : - self.filename = args['filename'] - else: - self.filename = None - - def isready(self): - # - # Is the basic information about the database valid - # - p = Couchdb.isready(self) - - if p == False: - return False - # - # The database name is set and correct at this point - # We insure the document of the given user has the requested attachment. - # - - doc = self.dbase.get(self.uid) - - if '_attachments' in doc: - r = self.filename in doc['_attachments'].keys() - - else: - r = False - - return r - def stream(self): - content = self.dbase.fetch_attachment(self.uid,self.filename).split('\n') ; - i = 1 - for row in content: - yield row - if size > 0 and i == size: - break - i = i + 1 - - def read(self,size=-1): - if self.filename is not None: - self.stream() - else: - return self.basic_read() - def basic_read(self): - document = self.dbase.get(self.uid) - del document['_id'], document['_rev'] - return document -""" - This class will write on a couchdb document provided a scope - The scope is the attribute that will be on the couchdb document -""" -class CouchdbWriter(Couchdb,Writer): - """ - @param uri host & port reference - @param uid user id involved - @param filename filename (attachment) - @param dbname database name (target) - """ - def __init__(self,**args): - - Couchdb.__init__(self,**args) - uri = args['uri'] - self.uid = args['uid'] - if 'filename' in args: - self.filename = args['filename'] - else: - self.filename = None - dbname = args['dbname'] - self.server = Server(uri=uri) - self.dbase = self.server.get_db(dbname) - # - # If the document doesn't exist then we should create it - # - - """ - write a given attribute to a document database - @param label scope of the row repair|broken|fixed|stats - @param row row to be written - """ - def write(self,**params): - - document = self.dbase.get(self.uid) - label = params['label'] - - - if 'row' in params : - row = params['row'] - row_is_list = isinstance(row,list) - if label not in document : - document[label] = row if row_is_list else [row] - elif isinstance(document[label][0],list) : - document[label] += row - - else: - document[label].append(row) - else : - if label not in document : - document[label] = {} - if isinstance(params['data'],object) : - - document[label] = dict(document[label],**params['data']) - else: - document[label] = params['data'] - - # if label not in document : - # document[label] = [] if isinstance(row,list) else {} - # if isinstance(document[label],list): - # document[label].append(row) - # else : - # document[label] = dict(document[label],**row) - self.dbase.save_doc(document) - def flush(self,**params) : - - size = params['size'] if 'size' in params else 0 - has_changed = False - document = self.dbase.get(self.uid) - for key in document: - if key not in ['_id','_rev','_attachments'] : - content = document[key] - else: - continue - if isinstance(content,list) and size > 0: - index = len(content) - size - content = content[index:] - document[key] = content - - else: - document[key] = {} - has_changed = True - - self.dbase.save_doc(document) - - def archive(self,params=None): - document = self.dbase.get(self.uid) - content = {} - _doc = {} - for id in document: - if id in ['_id','_rev','_attachments'] : - _doc[id] = document[id] - else: - content[id] = document[id] - - content = json.dumps(content) - document= _doc - now = str(datetime.today()) - - name = '-'.join([document['_id'] , now,'.json']) - self.dbase.save_doc(document) - self.dbase.put_attachment(document,content,name,'application/json') -""" - This class acts as a factory to be able to generate an instance of a Reader/Writer - Against a Queue,Disk,Cloud,Couchdb - The class doesn't enforce parameter validation, thus any error with the parameters sent will result in a null Object -""" -class DataSourceFactory: - def instance(self,**args): - source = args['type'] - params = args['args'] - anObject = None - - if source in ['HttpRequestReader','HttpSessionWriter']: - # - # @TODO: Make sure objects are serializable, be smart about them !! - # - aClassName = ''.join([source,'(**params)']) - - - else: - - stream = json.dumps(params) - aClassName = ''.join([source,'(**',stream,')']) - try: - - - anObject = eval( aClassName) - #setattr(anObject,'name',source) - except Exception,e: - print ['Error ',e] - return anObject -""" - This class implements a data-source handler that is intended to be used within the context of data processing, it allows to read/write anywhere transparently. - The class is a facade to a heterogeneous class hierarchy and thus simplifies how the calling code interacts with the class hierarchy -""" -class DataSource: - def __init__(self,sourceType='Disk',outputType='Disk',params={}): - self.Input = DataSourceFactory.instance(type=sourceType,args=params) - self.Output= DataSourceFactory.instance(type=outputType,args=params) - def read(self,size=-1): - return self.Input.read(size) - def write(self,**args): - self.Output.write(**args) -#p = {} -#p['host'] = 'dev.the-phi.com' -#p['uid'] = 'nyemba@gmail.com' -#p['qid'] = 'repair' -#factory = DataSourceFactory() -#o = factory.instance(type='QueueReader',args=p) -#print o is None -#q = QueueWriter(host='dev.the-phi.com',uid='nyemba@gmail.com') -#q.write(object='steve') -#q.write(object='nyemba') -#q.write(object='elon') - -