removing unused packages

data-collector
Steve L. Nyemba 4 years ago
parent d3c0ae08be
commit 1a9c4b6630

@ -1,134 +0,0 @@
h="""
This is a data-collector client, that is intended to perform data-collection operations and submit them to an endpoint
@required:
- key application/service key
- id node identifier
usage :
python data-collector.py --path config.json
The configuration file is structured as JSON object as follows :
{
id : identifier
key: customer's identification key,
api: http://localhost/monitor/1/client
folders:[]
}
NOTE: You can download a sample configuration file from https://the-phi.com/smart-top
"""
from utils.params import PARAMS as SYS_ARGS, Logger
import os
import requests
import json
# from threading import Thread, RLock
from monitor import Apps, Folders
import time
from datetime import datetime
class Collector :
def __init__(self) :
"""
The configuration file is passed to the class for basic initialization of variables
"""
self.httpclient = requests.Session()
def init(self):
# if 'folders' not in SYS_ARGS :
# #
# # If nothing is set it will monitor the temporary directory
# self.locations = [os.environ[name] for name in ['TEMP','TMP','TMPDIR'] if name in os.environ]
# else:
# self.locations = SYS_ARGS['folders']
#
# -- let's get the list of features we are interested .
url = SYS_ARGS['api']+'/1/client/login'
key = SYS_ARGS['key']
self.id = SYS_ARGS['id'] if 'id' in SYS_ARGS else os.environ['HOSTNAME']
headers = {"key":key,"id":self.id}
#
#-- what features are allowed
r = self.httpclient.post(url,headers=headers)
if r.status_code == 200 :
r = r.json()
self.features = r['features']
self.config = r['config'] #-- contains apps and folders
Logger.log(action="login",value=r)
else:
self.features = None
self.config = None
Logger.log(action='login',value='error')
def callback(self,channel,method,header,stream):
pass
def listen(self):
factory = DataSourceFactory()
# self.qlistener = factory.instance(type="QueueListener",args=_args)
# self.qlistener.callback = self.callback
# self.qlistener.init(SYS_ARGS['id'])
def post(self,**args) :
"""
This function will post data to the endpoint
"""
url = SYS_ARGS['api']+'/1/client/log'
key = SYS_ARGS['key']
id = SYS_ARGS['id'] if 'id' in SYS_ARGS else os.environ['HOSTNAME']
headers = {"key":key,"id":id,"context":args['context'],"content-type":"application/json"}
body = args['data'].fillna('').to_json(orient='records')
if args['data'].shape[0] > 0 :
r = self.httpclient.post(url,headers=headers,data=body)
Logger.log(action="post."+args['context'],value=r.status_code)
else:
Logger.log(action="data.error",value="no data :: "+args['context'])
def run(self):
"""
This function will execute the basic functions to monitor folders and apps running on the system
given the configuration specified on the server .
"""
while True :
try:
self.init()
if self.config and self.features :
ELAPSED_TIME = 60* int(self.features['schedule'].replace("min","").strip())
if 'apps' in self.config :
self.post( data=(Apps(node=self.id)).get(filter=self.config['apps']),context="apps")
if 'folders' in self.config and self.config['folders'] :
folder = Folders(node=self.id)
f = folder.get(path=self.config['folders'])
self.post(data = f ,context="folders")
Logger.log(action='sleeping',value=ELAPSED_TIME)
#
# In case no configuration is provided, the system will simply fall asleep and wait
# @TODO: Evaluate whether to wake up the system or not (security concerns)!
#
time.sleep(ELAPSED_TIME)
except Exception,e:
Logger.log(action='error',value=e.message)
print e
#break
pass
if __name__ == '__main__' :
#
#
if 'path' in SYS_ARGS :
path = SYS_ARGS['path']
f = open(path)
SYS_ARGS = json.loads(f.read())
f.close()
Logger.init('data-collector')
collector = Collector()
collector.run()
else:
print (h)

@ -1,174 +0,0 @@
"""
Steve L. Nyemba <steve@the-phi.com>
The Phi Technology - Smart Top
This program is the core for evaluating folders and applications. Each class is specialized to generate a report in a pandas data-frame
The classes will focus on Apps, Folders and Protocols
- SmartTop.get(**args)
@TODO:
Protocols (will be used in anomaly detection)
"""
from __future__ import division
import os
import subprocess
import numpy as np
import sys
import pandas as pd
import datetime
class SmartTop:
def __init__(self,**args):
self.node = args['node']
def get(self,**args):
return None
class Apps(SmartTop) :
def __init__(self,**args):
"""
This class will process a system command and parse the outpout accordingly given a parser
@param parse is a parser pointer
"""
SmartTop.__init__(self,**args)
self.cmd = "ps -eo pid,user,pmem,pcpu,stat,etime,args|awk 'OFS=\";\" {$1=$1; if($5 > 9) print }'"
self.xchar = ';'
def get_app(self,stream):
index = 1 if os.path.exists(" ".join(stream[:1])) else len(stream)-1
cmd = " ".join(stream[:index]) if index > 0 else " ".join(stream)
if ' ' in cmd.split('/')[len(cmd.split('/'))-1] :
p = cmd.split('/')[len(cmd.split('/'))-1].split(' ')
name = p[0]
args = " ".join(p[1:])
else:
name = cmd.split('/')[len(cmd.split('/'))-1]
args = " ".join(stream[index:]) if index > 0 else ""
return [name,cmd,args]
def to_pandas(self,m):
"""
This function will convert the output of ps to a data-frame
@param m raw matrix i.e list of values like a csv
"""
d = datetime.datetime.now().strftime('%m-%d-%Y')
t = datetime.datetime.now().strftime('%H:%M:%S')
m = [item for item in m if len(item) != len (m[0])]
m = "\n".join(m[1:])
df = pd.read_csv(pd.compat.StringIO(m),sep=self.xchar)
df['date'] = np.repeat(d,df.shape[0])
df['time'] = np.repeat(t,df.shape[0])
df['node'] = np.repeat(self.node,df.shape[0])
df.columns =['pid','user','mem','cpu','status','started','name','cmd','args','date','time','node']
return df
def empty(self,name):
return pd.DataFrame([{"pid":None,"user":None,"mem":0,"cpu":0,"status":"X","started":None,"name":name,"cmd":None,"args":None,"date":None,"time":None,"node":self.node}])
def parse(self,rows):
m = []
TIME_INDEX = 5
ARGS_INDEX = 6
for item in rows :
if rows.index(item) != 0 :
parts = item.split(self.xchar)
row = parts[:TIME_INDEX]
row.append(' '.join(parts[TIME_INDEX:ARGS_INDEX]))
row += self.get_app(parts[ARGS_INDEX:])
else:
row = item.split(self.xchar)
row = (self.xchar.join(row)).strip()
if len(row.replace(";","")) > 0 :
m.append(row)
return m
def get(self,**args):
"""
This function returns a the output of a command to the calling code that is piped into the class
The output will be stored in a data frame with columns
@
"""
try:
handler = subprocess.Popen(self.cmd,shell=True,stdout=subprocess.PIPE)
stream = handler.communicate()[0]
rows = stream.split('\n')
df = self.to_pandas(self.parse(rows))
r = pd.DataFrame()
if 'filter' in args :
pattern = "|".join(args['filter'])
i = df.cmd.str.contains(pattern)
r = df[i].copy()
r.index = np.arange(0,r.shape[0])
ii= (1 + np.array(i)*-1) == 1
other = pd.DataFrame(df[ii].sum()).T.copy()
other.index = np.arange(0,other.shape[0])
other.user = other.name = other.status = other.cmd = other.args = 'other'
other.started = other.pid = -1
other = other[other.columns[1:]]
for name in args['filter'] :
i = r.cmd.str.contains(str(name.strip()),case=False,na=False)
if i.sum() == 0:
r = r.append(self.empty(name),sort=False)
else :
pass
# r[i].update (pd.DataFrame({"name":np.repeat(name,r.shape[0])}))
r.loc[i, 'name'] = np.repeat(name,i.sum())
# r.loc[i].name = name
r = r.append(other,sort=False)
r.index = np.arange(r.shape[0])
return r
except Exception,e:
print (e)
return None
class Folders(SmartTop):
"""
This class will assess a folder and produce a report in a data-frame that can be later on used for summary statistics
"""
def __init__(self,**args):
SmartTop.__init__(self,**args)
def _get(self,dir_path,r=[]):
for child in os.listdir(dir_path):
path = os.path.join(dir_path, child)
if os.path.isdir(path):
self._get(path,r)
else:
size = os.path.getsize(path)
file_date = os.path.getatime(path)
file_date = datetime.datetime.fromtimestamp(file_date)
now = datetime.datetime.now()
age = (now - file_date ).days
name = os.path.basename(path)
r.append({"name":name,"path":path,"size":size,"age":age,"date":now.strftime('%m-%d-%Y'),"time":now.strftime('%H:%M:%S'),"node":self.node })
return r
def get(self,**args):
# path = args['path']
if isinstance(args['path'],list) == False:
paths = [args['path']]
else:
paths = args['path']
paths = paths
_out = pd.DataFrame()
for path in paths :
name = os.path.basename(path)
if os.path.exists(path) :
#
# If the folder does NOT exists it should not be treated.
#
rows = self._get(path)
if len(rows) > 0 :
r = pd.DataFrame(rows)
r = pd.DataFrame([{"name":name,"path":path,"files":r.shape[0],"age_in_days":r.age.mean(),"size_in_kb":r['size'].sum(),"date":r.date.max(),"time":r.time.max(),"node":r.node.max()}])
_out = _out.append(r,sort=False)
#
# @TODO: The state of the hard-drive would be a good plus
# os.system('df -h /')
_out.index = np.arange(0,_out.shape[0])
return _out

@ -1,368 +0,0 @@
"""
This class is designed to be an actor class i.e it will undertake certain actions given an event detected
The platform has 2 main sections (detection & analysis).
Action Types (Actors):
- Alert : Sends an email or Webhook
- Apps : Kill, Start
- Folder: Archive, Delete (all, age, size)
By design we are to understand that a message is structured as follows:
{to,from,content} with content either being an arbitrary stream (or JSON)
@TODO:
- upgrade to python 3.x
"""
import json
from threading import Thread
import os
import shutil
import subprocess
import re
from monitor import ProcessCounter
from utils.transport import QueueListener, QueueWriter, QueueReader
from utils.params import PARAMS
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from datetime import datetime
from StringIO import StringIO
from utils.services import Dropbox, Google
class Actor():
@staticmethod
def instance(name,args,logger=None):
"""
This function is a singleton that acts as a factory object for all the instances of this subclass
@param name name of the class to instantiate
@param args arguments to be passed in {configuration}
"""
r = []
if not isinstance(name,list):
name = [name]
for id in name :
try:
o = eval("".join([id,"()"]))
o.Initialize(args,logger)
r.append(o)
except Exception,e:
if logger is not None :
logger.log(subject='Actor',object='Factory',action='error',value=e.message)
print str(e)
return r[0] if len(r) == 1 else r
def __init__(self):
"""
Initializing the class with configuration. The configuration will be specific to each subclass
@param args arguments the class needs to be configured
"""
pass
def getName(self):
return self.__class__.__name__.lower()
# def getIdentifier(self):
# return self.__class__.__name__.lower()
def Initialize(self,args,logger=None):
self.config = args
self.logger = logger
def isValid(self,**item):
return False
def execute(self,cmd):
stream = None
try:
# subprocess.call (cmd,shell=False)
out = subprocess.Popen(cmd,stdout=subprocess.PIPE)
#stream = handler.communicate()[0]
except Exception,e:
pass
def post(self,**args):
pass
def log(self,**args):
if self.logger :
args['subject'] = self.getName()
self.logger.log(args)
class Apps(Actor) :
"""
This class is designed to handle application, restart, if need be.
conf{app-name:{args}}
"""
def __init__(self):
Actor.__init__(self)
# self.ng = None
def isValid(self,**args):
"""
We insure that the provided application exists and that the payload is correct
The class will only respond to reboot,kill,start actions
p validate the payload
q validate the app can be restarted
@NOTE: killing the application has no preconditions/requirements
"""
params = args['params']
action = args['action']
p = len(set(params.keys()) & set(['cmd','label'])) == 2
q = False
r = action in ['reboot','kill','start']
if p :
q = os.path.exists(params['cmd'])
return p and q and r
def init(self,action,params) :
"""
This function will initialize the the actor with applications and associated arguments
@param args {"apps_o":"","app_x":params}
"""
self.action = action
self.params = params
self.log(action='init',object=action,value=params)
def startup(self,cmd) :
"""
This function is intended to start a program given the configuration
@TODO We need to find the command in case the app has crashed
"""
try:
print""
print cmd
os.system(cmd +" &")
self.log(action='startup',value=cmd)
except Exception, e:
print e
def kill(self,name) :
"""
kill processes given the name, The function will not be case sensitive and partial names are accepted
@NOTE: Make sure the reference to the app is not ambiguous
"""
try:
args = "".join(['ps -eo pid,command|grep -Ei "',name.lower(),'"|grep -E "^ {0,}[0-9]+" -o|xargs kill -9'])
#self.execute([args])
subprocess.call([args],shell=True)
except Exception,e:
print e
def run(self):
__action = str(self.action).strip()
__params = dict(self.params)
pointer = None
if __action == 'reboot' :
def pointer():
self.kill(__params['label'])
self.startup(__params['cmd'])
elif __action == 'kill':
def pointer():
self.kill(__params['label'])
elif __action =='start':
def pointer() :
self.startup(__params['cmd'])
if pointer :
thread = Thread(target=pointer)
thread.start()
# pointer()
def analyze(self,logs) :
"""
This function is designed to analyze a few logs and take appropriate action
@param logs logs of application/process data; folder analysis or sandbox analysis
"""
pass
# for item in logs :
# name = item['label']
# if self.can_start(name) :
# self.startup(name)
# #
class Mailer (Actor):
"""
This class is a mailer agent
"""
def __init__(self):
Actor.__init__(self)
"""
conf = {uid:<account>,host:<host>,port:<port>,password:<password>}
"""
def init(self,conf) :
self.uid = conf['uid']
try:
self.handler = smtplib.SMTP_SSL(conf['host'],conf['port'])
r = self.handler.login(self.uid,conf['password'])
#
# @TODO: Check the status of the authentication
# If not authenticated the preconditions have failed
#
except Exception,e:
print str(e)
self.handler = None
pass
def send(self,**args) :
subject = args['subject']
message = args['message']
to = args['to']
if '<' in message and '>' in message :
message = MIMEText(message,'html')
else:
message = MIMEText(message,'plain')
message['From'] = self.uid
message['To'] = to
message['Subject'] = subject
return self.handler.sendmail(self.uid,to,message.as_string())
def close(self):
self.handler.quit()
class Folders(Actor):
def __init__(self):
Actor.__init__(self)
"""
This is designed to handle folders i.e cleaning/archiving the folders
if the user does NOT have any keys to cloud-view than she will not be able to archive
{threshold:value}
@params threshold in terms of size, or age. It will be applied to all folders
"""
def init(self,action,params):
self.action = action
# print args
# def init(self,args):
"""
This is initialized with parameters from the plan.
The parameters should be specific to the actor (folder)
folder_size
"""
#self.lfolders = args['folders'] #config['folders']
#self.action = args['action'] #{clear,archive} config['actions']['folders']
plan = params['plan']
self.threshold = self.get_size( plan['folder_size']) #self.config['threshold'])
# self.action = action
self.params = params
def isValid(self,**args):
action = args['action']
params = args['params']
p = len(set(action) & set(['clean','archive','backup'])) > 0
q = False
r = False
if p :
q = len(set(params.keys()) & set( ['label','folder'])) > 0
if q :
folder = params['label'] if 'label' in params else params['folder']
r = os.path.exists(folder)
return p and q and r
def archive(self,item):
"""
This function will archive all files in a given folder
@pre : isValid
"""
folder = item['label']
name = folder.split(os.sep)
name = name[len(name)-1]
date = str(datetime.now()).replace(' ','@')#''.join([str(i) for i in item['date'].values()])
# signature='-'.join([name,date,str(item['stats']['file_count']),'files'])
signature='-'.join([name,date])
tarball=os.sep.join([folder,'..',signature])
shutil.make_archive(tarball,'tar',folder)
#self.clean(item)
#
# @TODO: The archive can be uploaded to the cloud or else where
# @param id cloud service idenfier {dropbox,box,google-drive,one-drive}
# @param key authorization key for the given service
#
pass
return tarball+".tar"
def backup(self,tarball):
"""
This function will initiate backup to the cloud given
"""
if os.path.exists(tarball) :
key = self.params['key']
sid = self.params['user']['sid']
if sid == 'dropbox' :
cloud = Dropbox()
elif sid == 'google-drive' :
cloud = Google()
cloud.init(key)
file = open(tarball)
out = cloud.upload('backup','application/octet-stream',file)
file.close()
print out
pass
else:
pass
print tarball
print self.params['user']['sid']
print self.params['key']
#
# let's upload to the cloud
pass
def clean(self,item):
"""
This function consists in deleting files from a given folder
"""
rpath = item['label']
files = os.listdir(item['label'])
for name in list(files) :
path = os.sep.join([item['label'],name])
if os.path.isdir(path) :
shutil.rmtree(path)
else:
os.remove(path)
#
#
def get_size(self,value):
"""
converts size values into MB and returns the value without units
"""
units = {'MB':1000,'GB':1000000,'TB':1000000000} # converting to kb
key = set(units.keys()) & set(re.split('(\d+)',value.replace(' ','').upper()))
if len(key) == 0:
return -1
key = key.pop()
return float(value.upper().replace('MB','').strip()) * units[key]
def can_clean(self,item):
"""
This function returns whether the following :
p : folder exists
q : has_reached threashold
@TODO: Add a user defined configuration element to make this happen
"""
p = os.path.exists(item['label']) and item['label'] in self.lfolders
q = item['stats']['size']['mean'] >= self.threshold and self.threshold > 0
return p and q
def run(self):
tarball = None
if 'archive' in self.action :
tarball = self.archive(self.params)
if 'backup' in self.action and tarball:
self.backup(tarball)
if 'delete' in self.action and self.can_clean():
self.clean()
def analyze(self,logs):
r = {'clean':self.clean,'archive':self.archive}
self.lfolders = [ folder['label'] for folder in logs]
#for item in logs :
# if self.can_clean(item) :
# self.archive(item)
# self.clean(item)

@ -1,132 +0,0 @@
"""
This file encapsulates a class that is intended to perform learning
"""
from __future__ import division
import numpy as np
from sklearn import linear_model
from threading import Thread,RLock
from utils.transport import *
from utils.ml import AnomalyDetection,ML
from utils.params import PARAMS
import time
class BaseLearner(Thread):
def __init__(self,lock) :
Thread.__init__(self)
path = PARAMS['path']
self.name = self.__class__.__name__.lower()
self.rclass= None
self.wclass= None
self.rw_args=None
if os.path.exists(path) :
f = open(path)
self.config = json.loads(f.read())
f.close()
self.rclass = self.config['store']['class']['read']
self.wclass = self.config['store']['class']['write']
self.rw_args = self.config['store']['args']
else:
self.config = None
self.lock = lock
self.factory = DataSourceFactory()
self.quit = False
"""
This function is designed to stop processing gracefully
"""
def stop(self):
self.quit = True
"""
This class is intended to apply anomaly detection to various areas of learning
The areas of learning that will be skipped are :
['_id','_rev','learn'] ...
@TODO:
- Find a way to perform dimensionality reduction if need be
"""
class Anomalies(BaseLearner) :
def __init__(self,lock):
BaseLearner.__init__(self,lock)
if self.config :
#
# Initializing data store & factory class
#
self.id = self.config['id']
self.apps = self.config['procs'] if 'procs' in self.config else []
self.rclass = self.config['store']['class']['read']
self.wclass = self.config['store']['class']['write']
self.rw_args = self.config['store']['args']
# self.factory = DataSourceFactory()
self.quit = False
# self.lock = lock
def format(self,stream):
pass
def stop(self):
self.quit = True
def run(self):
DELAY = self.config['delay'] * 60
reader = self.factory.instance(type=self.rclass,args=self.rw_args)
data = reader.read()
key = 'apps@'+self.id
if key in data:
rdata = data[key]
features = ['memory_usage','cpu_usage']
yo = {"1":["running"],"name":"status"}
while self.quit == False :
print ' *** ',self.name, ' ' , str(datetime.today())
for app in self.apps:
print '\t',app,str(datetime.today()),' ** ',app
logs = ML.Filter('label',app,rdata)
if logs :
handler = AnomalyDetection()
value = handler.learn(logs,'label',app,features,yo)
if value is not None:
value = dict(value,**{"features":features})
value = dict({"id":self.id},**value)
#r[id][app] = value
self.lock.acquire()
writer = self.factory.instance(type=self.wclass,args=self.rw_args)
writer.write(label='learn',row=value)
self.lock.release()
#
if 'MONITOR_CONFIG_PATH' in os.environ :
break
time.sleep(DELAY)
print ' *** Exiting ',self.name.replace('a','A')
"""
Let's estimate how many files we will have for a given date
y = ax + b with y: number files, x: date, y: Number of files
"""
class Regression(BaseLearner):
def __init__(self,lock):
BaseLearner.__init__(self,lock)
self.folders = self.config['folders']
self.id = self.config['id']
def run(self):
DELAY = self.config['delay'] * 60
reader = self.factory.instance(type=self.rclass,args=self.rw_args)
data = reader.read()
if 'folders' in data :
data = ML.Filter('id',self.id,data['folders'])
xo = ML.Extract(['date'],data)
yo = ML.Extract(['count'],data)
pass
# print np.var(xo,yo)
if __name__ == '__main__' :
lock = RLock()
thread = Anomalies(lock)
thread.start()

@ -1,220 +0,0 @@
"""
Features :
- data collection
- detection, reboot (service)
- respond to commands (service)
"""
#from threading import Thread, RLock
from __future__ import division
import os
import json
import time
from datetime import datetime
from utils.transport import *
import monitor
import requests
from threading import Thread
class Manager() :
def version(self):
return 1.1
"""
delay : <value>
limit : <value>
scope : apps,folders,learner,sandbox
"""
def __init__(self):
self.factory = DataSourceFactory()
def set(self,name,value):
setattr(name,value)
def init(self,**args) :
self.id = args['node']
self.agents = args['agents']
self.config = dict(args['config'])
self.key = args['key']
self.actors = args['actors']
self.plan = self.config['plan']
self.DELAY = int(self.plan['metadata']['delay'])
self.host = args['host']
self.update() #-- Initializing status information
_args={"host":"dev.the-phi.com","qid":self.id,"uid":self.key}
#
# Connecting to the messaging service
self.qlistener = self.factory.instance(type="QueueListener",args=_args)
self.qlistener.callback = self.callback
self.qlistener.init(self.id)
# self.qlistener.read()
thread = (Thread(target=self.qlistener.read))
thread.start()
def update(self) :
"""
This method inspect the plans for the current account and makes sure it can/should proceed
The user must be subscribed and to the service otherwise this is not going to work
"""
url="http://:host/monitor/init/collector".replace(':host',self.host)
r = requests.post(url,headers={"key":self.key,"id":self.id})
r = json.loads(r.text)
# meta = [item['metadata'] for item in plans if item['status']=='active' ]
self.plan = r['plan']
meta = self.plan['metadata']
if meta :
self.DELAY = 60* int(meta['delay'])
self.LIMIT = int(meta['limit'])
#dbname = [item['name'] for item in plans if int(item['metadata']['limit']) == self.LIMIT][0]
#self.config['store']['args']['dbname'] = dbname
else:
self.DELAY = -1
self.LIMIT = -1
#self.filter(meta)
#
# We are removing all that is not necessary i.e making sure the features matches the plan user has paid for
#
self.agents = self.filter('agents',meta,self.agents)
self.actors = self.filter('actors',meta,self.actors)
self.setup(meta)
# def filter_collectors(self,meta) :
# """
# remove collectors that are not specified by the plan
# Note that the agents (collectors) have already been initialized ?
# """
# values = meta['agents'].replace(' ','').split(',')
# self.agents = [agent for agent in self.agents if agent.getName() in values]
# def filter_actors(self,meta):
# """
# removes actors that are NOT specified by the subscription plan
# Note that the actor have already been instatiated and need initialization
# """
# values = meta['actors'].replace(' ','').split('.')
# self.actors = [actor for actor in self.actors if actor.getName() in values]
def filter(self,id,meta,objects):
"""
This function filters the agents/actors given what is available in the user's plan
"""
values = meta[id].replace(' ','').split(',')
return [item for item in objects if item.getName() in values]
def setup(self,meta) :
# conf = {"folders":None,"apps":None}
# read_class = self.config['store']['class']['read']
# read_args = self.config['store']['args']
# args = None
# couchdb = self.factory.instance(type=read_class,args=read_args)
# args = couchdb.view('config/apps',key=self.key)
# if len(args.keys()) > 0 :
# self.apply_setup('apps',args)
# args = couchdb.view('config/folders',key=self.key)
# if 'folder_size' not in meta :
# # args['threshold'] = meta['folder_size']
# self.apply_setup('folders',meta)
#self.apply_setup('folders',meta)
#@TODO: For now app actors don't need any particular initialization
pass
def apply_setup(self,name,args) :
for actor in self.actors :
if args is not None and actor.getName() == name and len(args.keys()) > 0:
actor.init(args)
def isvalid(self):
self.update()
return self.DELAY > -1 and self.LIMIT > -1
def post(self,row) :
"""
This function is designed to take appropriate action if a particular incident has been detected
@param label
@param row data pulled extracted
"""
message = {}
message['action'] = 'reboot'
message['node'] = label
def callback(self,channel,method,header,stream):
"""
This function enables the manager to be able to receive messages and delegate them to the appropriate actor
@channel
"""
message = json.loads(stream)
#
# we should inspect the message and insure it has reached the appropriate recepient
#
if 'node' in message and message['node'] == self.id :
action = message['action']
params = message['params']
# params['plan'] = self.plan['metadata']
self.delegate(action,params)
def delegate(self,action,params):
for actor in self.actors :
if actor.isValid(action=action,params=params) :
actor.init(action,params)
actor.run()
break
pass
def run(self):
#DELAY=35*60 #- 35 Minutes
#LIMIT=1000
COUNT = 0
COUNT_STOP = int(24*60/ self.DELAY)
write_class = self.config['store']['class']['write']
read_args = self.config['store']['args']
while True :
COUNT += 1
if COUNT > COUNT_STOP :
if self.isvalid() :
COUNT = 0
else:
break
for agent in self.agents :
data = agent.composite()
label = agent.getName()
node = '@'.join([label,self.id])
row = {}
if label == 'folders':
row = [ dict({"id":self.id}, **_row) for _row in data]
else:
#label = id
row = data
#
# @TODO:
# This data should be marked if it has been flagged for reboot
#
if type(row)==list and len(row) == 0 :
continue
#
#
index = self.agents.index(agent)
if len(self.actors) > index and self.actors[index].getName() == agent.getName() :
actor = self.actors[index]
actor.analyze(row)
# self.lock.acquire()
store = self.factory.instance(type=write_class,args=read_args)
store.flush(size=self.LIMIT)
store.write(label=node,row=[row])
# self.lock.release()
print (["Falling asleep ",self.DELAY/60])
time.sleep(self.DELAY)

@ -1,32 +0,0 @@
import sys
PARAMS = {'context':''}
if len(sys.argv) > 1:
N = len(sys.argv)
for i in range(1,N):
value = None
if sys.argv[i].startswith('--'):
key = sys.argv[i].replace('-','')
PARAMS[key] = 1
if i + 1 < N:
value = sys.argv[i + 1] = sys.argv[i+1].strip()
if key and value:
PARAMS[key] = value
if key == 'context':
PARAMS[key] = ('/'+value).replace('//','/')
i += 2
import logging
import json
from datetime import datetime
class Logger :
@staticmethod
def init(filename):
name = "-".join([filename,datetime.now().strftime('%d-%m-%Y')])+".log"
logging.basicConfig(filename=name,level=logging.INFO,format="%(message)s")
@staticmethod
def log(**args) :
args['date'] = datetime.now().strftime('%d-%m-%Y %H:%M:%S')
logging.info(json.dumps(args))

@ -1,598 +0,0 @@
"""
CloudView Engine 2.0
The Phi Technology LLC - Steve L. Nyemba <steve@the-phi.com>
This is a basic cloud view engine that is designed to be integrated into any service and intended to work for anyone provided they have signed up with the cloud service provider
The intent is to make the engine a general purpose engine that can be either deployed as a service (3-launchpad) or integrated as data feed for a third party utility
"""
from __future__ import division
from threading import Thread
import json
import requests
from xmljson import yahoo as bf
from xml.etree.ElementTree import Element, tostring, fromstring, ElementTree as ET
import xmltodict
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from StringIO import StringIO
class Cloud:
BYTES_TO_GB = 1000000000
Config = None
STREAMING_URI = None
@staticmethod
def instance(id,**args):
id = id.strip()
if id == 'skydrive' :
id = 'one-drive'
handler = None
path = args['path'] if 'path' in args else None
if not Cloud.Config and path:
f = open(path)
Cloud.Config = json.loads(f.read())
Cloud.STREAMING_URI = str(Cloud.Config['api'])
Cloud.Config = Cloud.Config['cloud']
f.close()
if path and id in Cloud.Config :
context = Cloud.Config[id]
className = context['class']
config = json.dumps(context['config'])
handler = eval( "".join([className,"(",config,")"]))
#
# In case a stream was passed in ...
#
if 'stream' in args:
stream = args['stream']
context = Cloud.Config[id]
className = context['class']
handler = eval("".join([className,"(None)"]))
handler.from_json(stream)
#
# Once the handler is rovided we must retrieve the service given the key
# The key provides information about what files to extract as well as the preconditions
# @TODO:
# - Keys are maintained within the stripe account/couchdb
# -
return handler
def __init__(self):
self.access_token = None
self.refresh_token= None
self.files = []
self.client_id = None
self.secret = None
self.mfiles = {}
self.folders={}
def to_json(self):
object = {}
keys = vars(self)
for key in keys:
value = getattr(self,key)
object[key] = value
return json.dumps(object)
def from_json(self,stream):
ref = json.loads(stream) ;
for key in ref.keys() :
value = ref[key]
setattr(self,key,value)
# self.access_token = ref['access_token']
# self.refesh_token = ref['refresh_token']
# self.files = ref['files']
"""
This function matches a name with a list of possible features/extensions
"""
def match(self,filename,filters):
if isinstance(filters,str):
filters = [filters]
return len(set(filename.lower().split('.')) & set(filters)) > 0
def getName(self):
return self.__class__.__name__.lower()
def get_authURL(self):
config = Cloud.Config[self.getName()]['config']
url = config['authURL']
if '?' in url == False:
url += '?'
keys=['client_id','redirect_uri']
p = []
for id in keys:
value = config[id]
p.append(id+'='+value)
url = url +"&"+ "&".join(p)
return url
Cloud.Config = {}
class Google(Cloud):
def __init__(self,conf=None):
Cloud.__init__(self)
def getName(self):
return 'google-drive'
def init(self,token):
self.refresh_token = token
self._refresh()
def _refresh(self,code=None):
url = "https://accounts.google.com/o/oauth2/token"
headers = {"Content-Type":"application/x-www-form-urlencoded"}
data = {"client_id":self.client_id,"client_secret":self.secret}
if code :
grant_type = 'authorization_code'
data['code'] = code
else:
data['refresh_token'] = self.refresh_token
grant_type = 'refresh_token'
data['grant_type'] = grant_type
data['redirect_uri'] = self.redirect_uri
resp = requests.post(url,headers=headers,data=data)
r = json.loads(resp.text)
if 'access_token' in r:
self.access_token = r['access_token']
self.refresh_token = r['refresh_token'] if 'refresh_token' in r else r['access_token']
self.id_token = r['id_token']
def create_file(self,**args):
url = "https://www.googleapis.com/upload/drive/v2/files" ;
headers = {"Authorization":"Bearer "+self.access_token}
headers['Content-Type'] = args['mimetype']
params = args['params']
if 'data' not in args :
r = requests.post(url,params = params,headers=headers)
else:
data = args['data']
r = requests.post(url,data=data,params = params,headers=headers)
return r.json()
def update_metadata(self,id,metadata) :
url = "https://www.googleapis.com/drive/v2/files"
headers = {"Authorization":"Bearer "+self.access_token}
headers['Content-Type'] = 'application/json; charset=UTF-8'
if id is not None :
url += ("/"+id)
r = requests.put(url,json=metadata,headers=headers)
else:
# url += ("/?key="+self.secret)
r = requests.post(url,data=json.dumps(metadata),headers=headers)
return r.json()
def upload(self,folder,mimetype,file):
"""
This function will upload a file to a given folder and will provide
If the folder doesn't exist it will be created otherwise the references will be fetched
This allows us to avoid having to create several folders with the same name
"""
r = self.get_files(folder)
if len(r) == 0 :
info = {"name":folder, "mimeType":"application/vnd.google-apps.folder"}
r = self.update_metadata(None,{"name":folder,"title":folder, "mimeType":"application/vnd.google-apps.folder"})
else:
r = r[0]
parent = r
parent = {"kind":"drive#file","name":folder,"id":parent['id'],"mimeType":"application/vnd.google-apps.folder"}
r = self.create_file(data=file.read(),mimetype=mimetype,params={"uploadType":"media"})
info = {"title":file.filename,"description":"Create by Cloud View"}
info['parents'] = [parent]
r = self.update_metadata(r['id'],metadata=info)
return r
"""
This class is designed to allow users to interact with one-drive
"""
class OneDrive(Cloud):
def __init__(self,conf):
Cloud.__init__(self)
def getName(self):
return 'one-drive'
def init(self,token):
self.refresh_token = token
self._refresh()
def _refresh(self,code=None):
url = "https://login.live.com/oauth20_token.srf"
#url="https://login.microsoftonline.com/common/oauth2/v2.0/token"
headers = {"Content-Type":"application/x-www-form-urlencoded"}
form = {"client_id":self.client_id,"client_secret":self.secret}
if code:
grant_type = 'authorization_code'
form['code'] = str(code)
else:
grant_type = 'refresh_token'
form['refresh_token'] = self.refresh_token
form['grant_type'] = grant_type
if self.redirect_uri:
form['redirect_uri'] = self.redirect_uri
r = requests.post(url,headers=headers,data=form)
r = json.loads(r.text)
if 'access_token' in r:
self.access_token = r['access_token']
self.refresh_token = r['refresh_token']
def upload(self,folder,mimetype,file):
"""
@param folder parent.id
@param name name of the file with extension
@param stream file content
"""
path = folder+"%2f"+file.filename
url = "https://apis.live.net/v5.0/me/skydrive/files/:name?access_token=:token".replace(":name",path).replace(":token",self.access_token) ;
header = {"Authorization": "Bearer "+self.access_token,"Content-Type":mimetype}
header['Content-Type']= mimetype
r = requests.put(url,header=header,files=file)
r = r.json()
return r
"""
This class uses dropbox version 2 API
"""
class Dropbox(Cloud):
def __init__(self):
Cloud.__init__(self)
def init(self,access_token):
self.access_token = access_token
def upload(self,folder,mimetype,file):
"""
@param folder parent.id
@param name name of the file with extension
@param stream file content
@TODO: This upload will only limit itself to 150 MB, it is possible to increase this size
"""
url = "https://content.dropboxapi.com/2/files/upload"
folder = folder if folder is not None else ""
path = "/"+folder+"/"+file.name.split('/')[-1]
path = path.replace("//","/")
header = {"Authorization":"Bearer "+self.access_token,"Content-Type":mimetype}
#header['autorename']= "false"
header['mode'] = "add"
#header['mute'] = "false"
header['Dropbox-API-Arg'] = json.dumps({"path":path})
r = requests.post(url,headers=header,data=file.read())
print r.text
r = r.json()
return r
"""
This class implements basic interactions with box (cloud service providers)
Available functionalities are: authentication, file access,share and stream/download
"""
class Box(Cloud) :
def __init__(self,conf):
Cloud.__init__(self);
if conf is not None:
self.client_id = conf['client_id']
self.secret = conf['secret']
self.redirect_uri = conf['redirect_uri'] if 'redirect_uri' in conf else None
def init(self,token):
self.refresh_token = token
def set(self,code) :
self._access(code)
return 1 if self.access_token else 0
def _access(self,code):
body = {"client_id":self.client_id,"client_secret":self.secret,"grant_type":"authorization_code","code":code,"redirect_uri":self.redirect_uri}
headers = {"Content-Type":"application/x-www-form-urlencoded"}
url = "https://app.box.com/api/oauth2/token"
r = requests.post(url,headers=headers,data=body)
r = json.loads(r.text)
if 'error' not in r:
self.access_token = r['access_token']
self.refresh_token= r['refresh_token']
def _refresh(self,authToken) :
body = {"client_id":self.client_id,"client_secret":self.secret,"grant_type":"refresh_token"}
url = "https://app.box.com/api/oauth2/token";
headers = {"Content-Type":"application/x-www-form-urlencoded"}
r = requests.post(url,headers=headers,data=body)
r = json.loads(r.text)
if 'error' not in r :
self.access_token = r['access_token']
def get_user(self):
url = "https://api.box.com/2.0/users/me"
headers = {"Authorization":"Bearer "+self.access_token}
r = requests.get(url,headers=headers)
r = json.loads(r.text)
if 'login' in r :
#BYTES_TO_GB = 1000000000
user = {"uii":r['name'],"uid":r['login']}
usage = {"size":r['space_amount']/Cloud.BYTES_TO_GB,"used":r['space_used']/Cloud.BYTES_TO_GB,"units":"GB"}
user['usage'] = usage
return user
else:
return None
def format(self,item) :
file = {"name":item['name'],"origin":"box","id":item['id'],"url":""}
meta = {"last_modified":item['content_modified_at']}
return file
def get_files(self,ext,url=None):
ext = " ".join(ext)
url = "https://api.box.com/2.0/search?query=:filter&type=file"
url = url.replace(":filter",ext)
headers = {"Authorization":"Bearer "+self.access_token}
r = requests.get(url,headers=headers) ;
r = json.loads(r.text)
if 'entries' in r:
#self.files = [ self.format(file) for file in r['entries'] if file['type'] == 'file' and 'id' in file]
for item in r :
if item['type'] == 'file' and 'id' in item :
self.files.append( self.format(item))
else:
#
# We are dealing with a folder, this is necessary uploads
#
self.folder[item['name']] = item["id"]
return self.files
def stream(self,url):
headers = {"Authorization":"Bearer "+self.access_token}
r = requests.get(url,headers=headers,stream=True)
yield r.content
def share(self,id):
url = "https://api.box.com/2.0/files/:id".replace(":id",id);
headers = {"Authorization":"Bearer "+self.access_token,"Content-Type":"application/json"}
body = {"shared_link":{"access":"open","permissions":{"can_download":True}}}
r = requests.put(url,headers=headers,data=json.dumps(body))
r = json.loads(r.text)
if 'shared_link' in r:
return r['shared_link']['download_url']
return None
def upload(self,folder,mimetype,file):
"""
@param folder parent.id
@param name name of the file with extension
@param stream file content
"""
if folder not in self.folders :
#
# Let us create the folder now
#
url = "https://api.box.com/2.0/folders"
header = {"Authorization":"Bearer "+self.access_token}
pid = self.folders["/"] if "/" in self.folders else self.folders[""]
data = {"parent":{"id":str(pid)}}
r = requests.post(url,header=header,data=data)
r = r.json()
pid = r["id"]
else:
pid = self.folders[folder]
url = "https://upload.box.com/api/2.0/files/content"
header = {"Authorization Bearer ":self.access_token,"Content-Type":mimetype}
r = requests.post(url,header=header,file=file)
r = r.json()
return r
class SugarSync(Cloud):
def __init__(self):
Cloud.__init__(self)
def __init__(self,conf):
Cloud.__init__(self);
if conf is not None:
self.client_id = conf['app_id']
self.private_key = conf['private_key']
self.access_key = conf['access_key']
#self.access_token = None
#self.refresh_token= None
# self.redirect_uri = conf['redirect_uri'] if 'redirect_uri' in conf else None
#self.files = []
def init(self,token):
self.refresh_token = token
self._refresh()
def login(self,email,password):
xml = '<?xml version="1.0" encoding="UTF-8" standalone="yes"?><appAuthorization><username>:username</username><password>:password</password><application>:app_id</application><accessKeyId>:accesskey</accessKeyId><privateAccessKey>:privatekey</privateAccessKey></appAuthorization>'
xml = xml.replace(":app_id",self.app_id).replace(":privatekey",self.private_key).replace(":accesskey",self.access_key).replace(":username",email).replace(":password",password)
headers = {"Content-Type":"application/xml","User-Agent":"The Phi Technology"}
r = requests.post(url,headers=headers,data=xml)
self.refresh_token = r.headers['Location']
def _refresh(self):
xml = '<?xml version="1.0" encoding="UTF-8" standalone="yes"?><tokenAuthRequest><accessKeyId>:accesskey</accessKeyId><privateAccessKey>:privatekey</privateAccessKey><refreshToken>:authtoken</refreshToken></tokenAuthRequest>'
xml = xml.replace(":accesskey",self.access_key).replace(":privatekey",self.private_key).replace(":authtoken",self.refresh_token)
headers = {"Content-Type":"application/xml","User-Agent":"The Phi Technology LLC"}
url = "https://api.sugarsync.com/authorization"
r = requests.post(url,data=xml,headers=headers)
self.access_token = r.headers['Location']
def format(self,item):
file = {}
file['name'] = item['displayName']
file['url'] = item['fileData']
file['id'] = item['ref']
meta = {}
meta['last_modified'] = item['lastModified']
file['meta'] = meta
return file
def get_files(self,ext,url=None) :
if url is None:
url = "https://api.sugarsync.com/folder/:sc:3989243:2/contents";
headers = {"Authorization":self.access_token,"User-Agent":"The Phi Technology LLC","Content-Type":"application/xml;charset=utf-8"}
r = requests.get(url,headers=headers)
stream = r.text #.encode('utf-8')
r = xmltodict.parse(r.text)
if 'collectionContents' in r:
r = r['collectionContents']
#
# Extracting files in the current folder then we will see if there are any subfolders
# The parser has weird behaviors that leave inconsistent objects (field names)
# This means we have to filter it out by testing the item being processed
if 'file' in r:
if isinstance(r['file'],dict):
self.files += [ self.format(r['file']) ]
else:
#self.files += [self.format(item) for item in r['file'] if isinstance(item,(str, unicode)) == False and item['displayName'].endswith(ext)]
self.files += [self.format(item) for item in r['file'] if isinstance(item,(str, unicode)) == False and self.match(item['displayName'],ext)]
if 'collection' in r:
if isinstance(r['collection'],dict) :
#
# For some unusual reason the parser handles single instances as objects instead of collection
# @NOTE: This is a behavior that happens when a single item is in the collection
#
self.get_files(ext,r['collection']['contents'])
for item in r['collection'] :
if 'contents' in item:
if isinstance(item,(str, unicode)) == False:
self.files += self.get_files(ext,item['contents'])
#[ self.get_files(ext,item['contents']) for item in r['collection'] if item['type'] == 'folder']
return self.files
def get_user(self):
url = "https://api.sugarsync.com/user"
headers = {"Authorization":self.access_token,"User-Agent":"The Phi Technology LLC","Content-Type":"application/xml;charset=utf-8"}
r = requests.get(url,headers=headers)
r = xmltodict.parse(r.text)
r = r['user']
if 'username' in r and 'quota' in r:
user = {"uid":r['username'],"uii":r['nickname']}
size = long(r['quota']['limit'])
used = long(r['quota']['usage'])
usage = {"size":size/Cloud.BYTES_TO_GB,"used":used/Cloud.BYTES_TO_GB,"units":"GB"}
user['usage'] = usage
return user
else:
return None
def stream(self,url):
headers = {"Authorization":self.access_token}
r = requests.get(url,headers=headers,stream=True)
yield r.content
"""
This function will create a public link and share it to designated parties
"""
def share(self,id):
url = "https://api.sugarsync.com/file/:id".replace(":id",id);
xml = '<?xml version="1.0" encoding="UTF-8" ?><file><publicLink enabled="true"/></file>';
headers = {"Content-Type":"application/xml","Authorization":self.access_token,"User-Agent":"The Phi Technology LLC"}
r = requests.put(url,header=header,data=xml)
r = xmltodict.parse(r.text)
if 'file' in r:
return r['file']['publicLink']['content']+"?directDownload=true"
else:
return None
def upload(self,folder,mimetype,file):
name = foler+"/"+file.filename
xml = '<?xml version="1.0" encoding="UTF-8" ?><file><displayName>:name</displayName><mediaType>:type</mediaType></file>'
xml = xml.replace(':name',name).replace(':type',mimetype)
header = {"content-type":"application/xml","User-Agent":"The Phi Technology LLC"}
header['Authorization'] = self.access_token
r = requests.post(url,headers=header,files=file,data=xml)
pass
class iTunes(Cloud):
def __init__(self):
Cloud.__init__(self)
self.url_topsongs = "http://ax.itunes.apple.com/WebObjects/MZStoreServices.woa/ws/RSS/topsongs/limit=:limit/explicit=false/json"
self.url_search = "http://itunes.apple.com/search?term=:keyword&limit=:limit&media=music"
def parse_search(self,obj):
files = []
try:
logs = obj['results']
for item in logs :
file = {}
file['id'] = item['trackId']
file['name'] = item['trackName']
file['id3'] = {}
file['id3']['track'] = item['trackName']
file['id3']['title'] = item['trackName']
file['id3']['artist']= item['artistName']
file['id3']['album'] = item['collectionName']
file['id3']['genre'] = item['primaryGenreName']
file['id3']['poster']= item['artworkUrl100']
file['url'] = item['previewUrl']
files.append(file)
except Exception,e:
print e
return files
def parse_chart(self,obj):
"""
This function will parse the tonsongs returned by the itunes API
"""
files = []
try:
logs = obj['feed']['entry']
if isinstance(logs,dict) :
logs = [logs]
for item in logs :
file = {'name':item['im:name']['label'],'id3':{}}
file['id'] = item['id']['attributes']['im:id']
file['id3'] = {}
file['id3']['artist'] = item['im:artist']['label']
file['id3']['track'] = item['title']['label']
file['id3']['title'] = item['title']['label']
file['id3']['album'] = item['im:collection']['im:name']['label']
file['id3']['genre'] = item['category']['attributes']['term']
index = len(item['im:image'])-1
file['id3']['poster'] = item['im:image'][index]['label']
url = [link['attributes']['href'] for link in item['link'] if 'im:assetType' in link['attributes'] and link['attributes']['im:assetType']=='preview']
if len(url) > 0:
url = url[0]
file['url'] = url #item['link'][1]['attributes']['href'] //'im:assetType' == 'preview' and 'im:duration' is in the sub-item
files.append(file)
else:
continue
except Exception,e:
print e
#
# @TODO: Log the error somewhere to make it useful
return files
def parse(self,obj) :
if 'feed' in obj and 'entry' in obj['feed']:
return self.parse_chart(obj)
elif 'results' in obj :
return self.parse_search(obj)
else:
return []
def get_files(self,keyword=None,limit="1") :
url = self.url_search if keyword is not None else self.url_topsongs
keyword = "" if keyword is None else keyword
# limit = "50" if keyword == "" else "1"
url = url.replace(":keyword",keyword.replace(' ','+')).replace(':limit',limit)
r = requests.get(url)
r= r.json()
return self.parse(r)

@ -1,709 +0,0 @@
"""
This file implements data transport stuctures in order to allow data to be moved to and from anywhere
We can thus read data from disk and write to the cloud,queue, or couchdb or SQL
"""
from flask import request, session
import os
import pika
import json
import numpy as np
from couchdbkit import Server
import re
from csv import reader
from datetime import datetime
"""
@TODO: Write a process by which the class automatically handles reading and creating a preliminary sample and discovers the meta data
"""
class Reader:
def __init__(self):
self.nrows = 0
self.xchar = None
def row_count(self):
content = self.read()
return np.sum([1 for row in content])
"""
This function determines the most common delimiter from a subset of possible delimiters. It uses a statistical approach to guage the distribution of columns for a given delimiter
"""
def delimiter(self,sample):
m = {',':[],'\t':[],'|':[],'\x3A':[]}
delim = m.keys()
for row in sample:
for xchar in delim:
if row.split(xchar) > 1:
m[xchar].append(len(row.split(xchar)))
else:
m[xchar].append(0)
#
# The delimiter with the smallest variance, provided the mean is greater than 1
# This would be troublesome if there many broken records sampled
#
m = {id: np.var(m[id]) for id in m.keys() if m[id] != [] and int(np.mean(m[id]))>1}
index = m.values().index( min(m.values()))
xchar = m.keys()[index]
return xchar
"""
This function determines the number of columns of a given sample
@pre self.xchar is not None
"""
def col_count(self,sample):
m = {}
i = 0
for row in sample:
row = self.format(row)
id = str(len(row))
#id = str(len(row.split(self.xchar)))
if id not in m:
m[id] = 0
m[id] = m[id] + 1
index = m.values().index( max(m.values()) )
ncols = int(m.keys()[index])
return ncols;
"""
This function will clean records of a given row by removing non-ascii characters
@pre self.xchar is not None
"""
def format (self,row):
if isinstance(row,list) == False:
#
# We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary)
cols = self.split(row)
#cols = row.split(self.xchar)
else:
cols = row ;
return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols]
#if isinstance(row,list) == False:
# return (self.xchar.join(r)).format('utf-8')
#else:
# return r
"""
This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes.
@pre : self.xchar is not None
"""
def split (self,row):
pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"])
return re.findall(pattern,row.replace('\n',''))
class Writer:
def format(self,row,xchar):
if xchar is not None and isinstance(row,list):
return xchar.join(row)+'\n'
elif xchar is None and isinstance(row,dict):
row = json.dumps(row)
return row
"""
It is important to be able to archive data so as to insure that growth is controlled
Nothing in nature grows indefinitely neither should data being handled.
"""
def archive(self):
pass
def flush(self):
pass
"""
This class is designed to read data from an Http request file handler provided to us by flask
The file will be heald in memory and processed accordingly
NOTE: This is inefficient and can crash a micro-instance (becareful)
"""
class HttpRequestReader(Reader):
def __init__(self,**params):
self.file_length = 0
try:
#self.file = params['file']
#self.file.seek(0, os.SEEK_END)
#self.file_length = self.file.tell()
#print 'size of file ',self.file_length
self.content = params['file'].readlines()
self.file_length = len(self.content)
except Exception, e:
print "Error ... ",e
pass
def isready(self):
return self.file_length > 0
def read(self,size =-1):
i = 1
for row in self.content:
i += 1
if size == i:
break
yield row
"""
This class is designed to write data to a session/cookie
"""
class HttpSessionWriter(Writer):
"""
@param key required session key
"""
def __init__(self,**params):
self.session = params['queue']
self.session['sql'] = []
self.session['csv'] = []
self.tablename = re.sub('..+$','',params['filename'])
self.session['uid'] = params['uid']
#self.xchar = params['xchar']
def format_sql(self,row):
values = "','".join([col.replace('"','').replace("'",'') for col in row])
return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename)
def isready(self):
return True
def write(self,**params):
label = params['label']
row = params ['row']
if label == 'usable':
self.session['csv'].append(self.format(row,','))
self.session['sql'].append(self.format_sql(row))
"""
This class is designed to read data from disk (location on hard drive)
@pre : isready() == True
"""
class DiskReader(Reader) :
"""
@param path absolute path of the file to be read
"""
def __init__(self,**params):
Reader.__init__(self)
self.path = params['path'] ;
def isready(self):
return os.path.exists(self.path)
"""
This function reads the rows from a designated location on disk
@param size number of rows to be read, -1 suggests all rows
"""
def read(self,size=-1):
f = open(self.path,'rU')
i = 1
for row in f:
i += 1
if size == i:
break
yield row
f.close()
"""
This function writes output to disk in a designated location
"""
class DiskWriter(Writer):
def __init__(self,**params):
if 'path' in params:
self.path = params['path']
else:
self.path = None
if 'name' in params:
self.name = params['name'];
else:
self.name = None
if os.path.exists(self.path) == False:
os.mkdir(self.path)
"""
This function determines if the class is ready for execution or not
i.e it determines if the preconditions of met prior execution
"""
def isready(self):
p = self.path is not None and os.path.exists(self.path)
q = self.name is not None
return p and q
"""
This function writes a record to a designated file
@param label <passed|broken|fixed|stats>
@param row row to be written
"""
def write(self,**params):
label = params['label']
row = params['row']
xchar = None
if 'xchar' is not None:
xchar = params['xchar']
path = ''.join([self.path,os.sep,label])
if os.path.exists(path) == False:
os.mkdir(path) ;
path = ''.join([path,os.sep,self.name])
f = open(path,'a')
row = self.format(row,xchar);
f.write(row)
f.close()
"""
This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq)
"""
class MessageQueue:
def __init__(self,**params):
self.host= params['host']
self.uid = params['uid']
self.qid = params['qid']
def isready(self):
#self.init()
resp = self.connection is not None and self.connection.is_open
self.close()
return resp
def close(self):
if self.connection.is_closed == False :
self.channel.close()
self.connection.close()
"""
This class is designed to publish content to an AMQP (Rabbitmq)
The class will rely on pika to implement this functionality
We will publish information to a given queue for a given exchange
"""
class QueueWriter(MessageQueue,Writer):
def __init__(self,**params):
#self.host= params['host']
#self.uid = params['uid']
#self.qid = params['queue']
MessageQueue.__init__(self,**params);
def init(self,label=None):
properties = pika.ConnectionParameters(host=self.host)
self.connection = pika.BlockingConnection(properties)
self.channel = self.connection.channel()
self.info = self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True)
if label is None:
self.qhandler = self.channel.queue_declare(queue=self.qid,durable=True)
else:
self.qhandler = self.channel.queue_declare(queue=label,durable=True)
self.channel.queue_bind(exchange=self.uid,queue=self.qhandler.method.queue)
"""
This function writes a stream of data to the a given queue
@param object object to be written (will be converted to JSON)
@TODO: make this less chatty
"""
def write(self,**params):
xchar = None
if 'xchar' in params:
xchar = params['xchar']
object = self.format(params['row'],xchar)
label = params['label']
self.init(label)
_mode = 2
if isinstance(object,str):
stream = object
_type = 'text/plain'
else:
stream = json.dumps(object)
if 'type' in params :
_type = params['type']
else:
_type = 'application/json'
self.channel.basic_publish(
exchange=self.uid,
routing_key=label,
body=stream,
properties=pika.BasicProperties(content_type=_type,delivery_mode=_mode)
);
self.close()
def flush(self,label):
self.init(label)
_mode = 1 #-- Non persistent
self.channel.queue_delete( queue=label);
self.close()
"""
This class will read from a queue provided an exchange, queue and host
@TODO: Account for security and virtualhosts
"""
class QueueReader(MessageQueue,Reader):
"""
@param host host
@param uid exchange identifier
@param qid queue identifier
"""
def __init__(self,**params):
#self.host= params['host']
#self.uid = params['uid']
#self.qid = params['qid']
MessageQueue.__init__(self,**params);
if 'durable' in params :
self.durable = True
else:
self.durable = False
self.size = -1
self.data = {}
def init(self,qid):
properties = pika.ConnectionParameters(host=self.host)
self.connection = pika.BlockingConnection(properties)
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True)
self.info = self.channel.queue_declare(queue=qid,durable=True)
"""
This is the callback function designed to process the data stream from the queue
"""
def callback(self,channel,method,header,stream):
r = []
if re.match("^\{|\[",stream) is not None:
r = json.loads(stream)
else:
r = stream
qid = self.info.method.queue
if qid not in self.data :
self.data[qid] = []
self.data[qid].append(r)
#
# We stop reading when the all the messages of the queue are staked
#
if self.size == len(self.data[qid]) or len(self.data[qid]) == self.info.method.message_count:
self.close()
"""
This function will read, the first message from a queue
@TODO:
Implement channel.basic_get in order to retrieve a single message at a time
Have the number of messages retrieved be specified by size (parameter)
"""
def read(self,size=-1):
r = {}
self.size = size
#
# We enabled the reader to be able to read from several queues (sequentially for now)
# The qid parameter will be an array of queues the reader will be reading from
#
if isinstance(self.qid,basestring) :
self.qid = [self.qid]
for qid in self.qid:
self.init(qid)
# r[qid] = []
if self.info.method.message_count > 0:
self.channel.basic_consume(self.callback,queue=qid,no_ack=False);
self.channel.start_consuming()
else:
pass
#self.close()
# r[qid].append( self.data)
return self.data
class QueueListener(QueueReader):
def init(self,qid):
properties = pika.ConnectionParameters(host=self.host)
self.connection = pika.BlockingConnection(properties)
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True )
self.info = self.channel.queue_declare(passive=True,exclusive=True,queue=qid)
self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid)
#self.callback = callback
def read(self):
self.init(self.qid)
self.channel.basic_consume(self.callback,queue=self.qid,no_ack=True);
self.channel.start_consuming()
"""
This class is designed to write output as sql insert statements
The class will inherit from DiskWriter with minor adjustments
@TODO: Include script to create the table if need be using the upper bound of a learner
"""
class SQLDiskWriter(DiskWriter):
def __init__(self,**args):
DiskWriter.__init__(self,**args)
self.tablename = re.sub('\..+$','',self.name).replace(' ','_')
"""
@param label
@param row
@param xchar
"""
def write(self,**args):
label = args['label']
row = args['row']
if label == 'usable':
values = "','".join([col.replace('"','').replace("'",'') for col in row])
row = "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename)
args['row'] = row
DiskWriter.write(self,**args)
class Couchdb:
"""
@param uri host & port reference
@param uid user id involved
@param dbname database name (target)
"""
def __init__(self,**args):
uri = args['uri']
self.uid = args['uid']
dbname = args['dbname']
self.server = Server(uri=uri)
self.dbase = self.server.get_db(dbname)
if self.dbase.doc_exist(self.uid) == False:
self.dbase.save_doc({"_id":self.uid})
"""
Insuring the preconditions are met for processing
"""
def isready(self):
p = self.server.info() != {}
if p == False or self.dbase.dbname not in self.server.all_dbs():
return False
#
# At this point we are sure that the server is connected
# We are also sure that the database actually exists
#
q = self.dbase.doc_exist(self.uid)
if q == False:
return False
return True
def view(self,id,**args):
r =self.dbase.view(id,**args)
r = r.all()
return r[0]['value'] if len(r) > 0 else []
"""
This function will read an attachment from couchdb and return it to calling code. The attachment must have been placed before hand (otherwise oops)
@T: Account for security & access control
"""
class CouchdbReader(Couchdb,Reader):
"""
@param filename filename (attachment)
"""
def __init__(self,**args):
#
# setting the basic parameters for
Couchdb.__init__(self,**args)
if 'filename' in args :
self.filename = args['filename']
else:
self.filename = None
def isready(self):
#
# Is the basic information about the database valid
#
p = Couchdb.isready(self)
if p == False:
return False
#
# The database name is set and correct at this point
# We insure the document of the given user has the requested attachment.
#
doc = self.dbase.get(self.uid)
if '_attachments' in doc:
r = self.filename in doc['_attachments'].keys()
else:
r = False
return r
def stream(self):
content = self.dbase.fetch_attachment(self.uid,self.filename).split('\n') ;
i = 1
for row in content:
yield row
if size > 0 and i == size:
break
i = i + 1
def read(self,size=-1):
if self.filename is not None:
self.stream()
else:
return self.basic_read()
def basic_read(self):
document = self.dbase.get(self.uid)
del document['_id'], document['_rev']
return document
"""
This class will write on a couchdb document provided a scope
The scope is the attribute that will be on the couchdb document
"""
class CouchdbWriter(Couchdb,Writer):
"""
@param uri host & port reference
@param uid user id involved
@param filename filename (attachment)
@param dbname database name (target)
"""
def __init__(self,**args):
Couchdb.__init__(self,**args)
uri = args['uri']
self.uid = args['uid']
if 'filename' in args:
self.filename = args['filename']
else:
self.filename = None
dbname = args['dbname']
self.server = Server(uri=uri)
self.dbase = self.server.get_db(dbname)
#
# If the document doesn't exist then we should create it
#
"""
write a given attribute to a document database
@param label scope of the row repair|broken|fixed|stats
@param row row to be written
"""
def write(self,**params):
document = self.dbase.get(self.uid)
label = params['label']
if 'row' in params :
row = params['row']
row_is_list = isinstance(row,list)
if label not in document :
document[label] = row if row_is_list else [row]
elif isinstance(document[label][0],list) :
document[label] += row
else:
document[label].append(row)
else :
if label not in document :
document[label] = {}
if isinstance(params['data'],object) :
document[label] = dict(document[label],**params['data'])
else:
document[label] = params['data']
# if label not in document :
# document[label] = [] if isinstance(row,list) else {}
# if isinstance(document[label],list):
# document[label].append(row)
# else :
# document[label] = dict(document[label],**row)
self.dbase.save_doc(document)
def flush(self,**params) :
size = params['size'] if 'size' in params else 0
has_changed = False
document = self.dbase.get(self.uid)
for key in document:
if key not in ['_id','_rev','_attachments'] :
content = document[key]
else:
continue
if isinstance(content,list) and size > 0:
index = len(content) - size
content = content[index:]
document[key] = content
else:
document[key] = {}
has_changed = True
self.dbase.save_doc(document)
def archive(self,params=None):
document = self.dbase.get(self.uid)
content = {}
_doc = {}
for id in document:
if id in ['_id','_rev','_attachments'] :
_doc[id] = document[id]
else:
content[id] = document[id]
content = json.dumps(content)
document= _doc
now = str(datetime.today())
name = '-'.join([document['_id'] , now,'.json'])
self.dbase.save_doc(document)
self.dbase.put_attachment(document,content,name,'application/json')
"""
This class acts as a factory to be able to generate an instance of a Reader/Writer
Against a Queue,Disk,Cloud,Couchdb
The class doesn't enforce parameter validation, thus any error with the parameters sent will result in a null Object
"""
class DataSourceFactory:
def instance(self,**args):
source = args['type']
params = args['args']
anObject = None
if source in ['HttpRequestReader','HttpSessionWriter']:
#
# @TODO: Make sure objects are serializable, be smart about them !!
#
aClassName = ''.join([source,'(**params)'])
else:
stream = json.dumps(params)
aClassName = ''.join([source,'(**',stream,')'])
try:
anObject = eval( aClassName)
#setattr(anObject,'name',source)
except Exception,e:
print ['Error ',e]
return anObject
"""
This class implements a data-source handler that is intended to be used within the context of data processing, it allows to read/write anywhere transparently.
The class is a facade to a heterogeneous class hierarchy and thus simplifies how the calling code interacts with the class hierarchy
"""
class DataSource:
def __init__(self,sourceType='Disk',outputType='Disk',params={}):
self.Input = DataSourceFactory.instance(type=sourceType,args=params)
self.Output= DataSourceFactory.instance(type=outputType,args=params)
def read(self,size=-1):
return self.Input.read(size)
def write(self,**args):
self.Output.write(**args)
#p = {}
#p['host'] = 'dev.the-phi.com'
#p['uid'] = 'nyemba@gmail.com'
#p['qid'] = 'repair'
#factory = DataSourceFactory()
#o = factory.instance(type='QueueReader',args=p)
#print o is None
#q = QueueWriter(host='dev.the-phi.com',uid='nyemba@gmail.com')
#q.write(object='steve')
#q.write(object='nyemba')
#q.write(object='elon')
Loading…
Cancel
Save