diff --git a/src/utils/agents/__init__.py b/src/utils/agents/__init__.py old mode 100644 new mode 100755 diff --git a/src/utils/agents/actor.py b/src/utils/agents/actor.py new file mode 100755 index 0000000..36ecfa6 --- /dev/null +++ b/src/utils/agents/actor.py @@ -0,0 +1,254 @@ +""" + This class is designed to be an actor class i.e it will undertake certain actions given an event detected + The platform has 2 main sections (detection & analysis). + Action Types (Actors): + - Alert : Sends an email or Webhook + - Apps : Kill, Start + - Folder: Archive, Delete (all, age, size) + By design we are to understand that a message is structured as follows: + {to,from,content} with content either being an arbitrary stream (or JSON) + @TODO: + - upgrade to python 3.x +""" +import json +from threading import Thread +import os +import shutil +import subprocess +import re +from monitor import ProcessCounter +from utils.transport import QueueListener, QueueWriter, QueueReader +from utils.params import PARAMS +from ngram import NGram as ng +import smtplib +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + +class Actor(): + @staticmethod + def instance(name,args): + """ + This function is a singleton that acts as a factory object for all the instances of this subclass + @param name name of the class to instantiate + @param args arguments to be passed in {configuration} + """ + o = None + try: + o = eval("".join([name,"()"])) + o.init(args) + except Exception,e: + print str(e) + return o + def __init__(self): + """ + Initializing the class with configuration. The configuration will be specific to each subclass + @param args arguments the class needs to be configured + """ + + pass + def getName(self): + return self.__class__.__name__.lower() + + # def getIdentifier(self): + # return self.__class__.__name__.lower() + + def init(self,args): + self.config = args + + def isValid(self,item): + return False + + def execute(self,cmd): + stream = None + try: + # subprocess.call (cmd,shell=False) + out = subprocess.Popen(cmd,stdout=subprocess.PIPE) + print out + #stream = handler.communicate()[0] + except Exception,e: + pass + def post(self,**args): + pass +class Apps(Actor) : + """ + This class is designed to handle application, restart, if need be. + conf{app-name:{args}} + """ + def __init__(self): + Actor.__init__(self) + self.ng = None + def init(self,config) : + """ + This function will initialize the the actor with applications and associated arguments + @param args {"apps_o":"","app_x":params} + """ + Actor.init(self,config) + self.ng = ng(self.config.keys()) + + + def can_start(self,name): + """ + This function is intended to determine if it is possible to boot an application + + """ + items = self.ng.search(name) if self.ng is not None else [] + if len(items) == 0 : + return False + else: + return items[0][1] > 0.01 + + def startup(self,name) : + """ + This function is intended to start a program given the configuration + """ + items = self.ng.search(name)[0] + app = items[0] + args = self.config[app] + + cmd = " ".join([app,args,"&" ]) + self.execute([app,args]) + + def kill(self,name) : + """ + kill processes given the name, The function will not be case sensitive and partial names are accepted + @NOTE: Make sure the reference to the app is not ambiguous + """ + args = "".join(['ps -eo pid,command|grep -E -i "',name.lower(),'"|grep -E "^ {0,}[0-9]+" -o|xargs kill -9']) + #self.execute([args]) + subprocess.call([args],shell=True) + + def analyze(self,logs) : + """ + This function is designed to analyze a few logs and take appropriate action + @param logs logs of application/process data; folder analysis or sandbox analysis + """ + for item in logs : + name = item['label'] + if self.can_start(name) : + self.startup(name) + # + + +class Mailer (Actor): + """ + This class is a mailer agent + """ + def __init__(self): + Actor.__init__(self) + """ + conf = {uid:,host:,port:,password:} + """ + def init(self,conf) : + self.uid = conf['uid'] + + + try: + + self.handler = smtplib.SMTP_SSL(conf['host'],conf['port']) + r = self.handler.login(self.uid,conf['password']) + # + # @TODO: Check the status of the authentication + # If not authenticated the preconditions have failed + # + except Exception,e: + print str(e) + self.handler = None + pass + + + def send(self,**args) : + subject = args['subject'] + message = args['message'] + to = args['to'] + if '<' in message and '>' in message : + message = MIMEText(message,'html') + else: + message = MIMEText(message,'plain') + message['From'] = self.uid + message['To'] = to + message['Subject'] = subject + return self.handler.sendmail(self.uid,to,message.as_string()) + def close(self): + self.handler.quit() + + +class Folders(Actor): + def __init__(self): + Actor.__init__(self) + """ + This is designed to handle folders i.e cleaning/archiving the folders + if the user does NOT have any keys to cloud-view than she will not be able to archive + {threshold:value} + @params threshold in terms of size, or age. It will be applied to all folders + """ + + def init(self,args): + #self.lfolders = args['folders'] #config['folders'] + #self.action = args['action'] #{clear,archive} config['actions']['folders'] + self.threshold = self.get_size( args['threshold']) #self.config['threshold']) + + + def archive(self,item): + """ + This function will archive all files in a given folder + @pre : isValid + """ + folder = item['label'] + name = folder.split(os.sep) + name = name[len(name)-1] + signature='-'.join([name,str(item['date']),str(item['count']),'files']) + tarball=os.sep.join([folder,'..',signature]) + shutil.make_archive(tarball,'tar',folder) + self.clean(item) + # + # @TODO: The archive can be uploaded to the cloud or else where + # @param id cloud service idenfier {dropbox,box,google-drive,one-drive} + # @param key authorization key for the given service + # + pass + + def clean(self,item): + """ + This function consists in deleting files from a given folder + """ + rpath = item['label'] + files = os.listdir(item['label']) + for name in list(files) : + path = os.sep.join([item['label'],name]) + if os.path.isdir(path) : + shutil.rmtree(path) + else: + os.remove(path) + # + # + + def get_size(self,value): + """ + converts size values into MB and returns the value without units + """ + units = {'MB':1000,'GB':1000000,'TB':1000000000} # converting to kb + key = set(units.keys()) & set(re.split('(\d+)',value.replace(' ','').upper())) + + + if len(key) == 0: + return -1 + key = key.pop() + return float(value.upper().replace('MB','').strip()) * units[key] + + def can_clean(self,item): + """ + This function returns whether the following : + p : folder exists + q : has_reached threashold + """ + p = os.path.exists(item['label']) and item['label'] in self.lfolders + q = self.get_size(item['size']) >= self.threshold + return p and q + + def analyze(self,logs): + r = {'clean':self.clean,'archive':self.archive} + self.lfolders = [ folder['label'] for folder in logs] + for item in logs : + if self.can_clean(item) : + self.archive(item) + #self.clean(item) diff --git a/src/utils/agents/learner.py b/src/utils/agents/learner.py old mode 100644 new mode 100755 diff --git a/src/utils/agents/manager.py b/src/utils/agents/manager.py new file mode 100755 index 0000000..9eb811e --- /dev/null +++ b/src/utils/agents/manager.py @@ -0,0 +1,256 @@ +""" + Features : + - data collection + - detection, reboot (service) + - respond to commands (service) +""" +#from threading import Thread, RLock +from __future__ import division +import os +import json +import time +from datetime import datetime +from utils.transport import * +import monitor +import requests +class Manager() : + def version(self): + return 1.1 + """ + + delay : + limit : + scope : apps,folders,learner,sandbox + """ + def __init__(self): + self.factory = DataSourceFactory() + def set(self,name,value): + setattr(name,value) + def init(self,**args) : + self.id = args['node'] + self.agents = args['agents'] + self.config = dict(args['config']) + self.key = args['key'] + self.actors = args['actors'] + self.plan = self.config['plan'] + + self.DELAY = int(self.plan['metadata']['delay']) + + self.update() #-- Initializing status information + + def update(self) : + """ + This method inspect the plans for the current account and makes sure it can/should proceed + The user must be subscribed and to the service otherwise this is not going to work + """ + # url="https://the-phi.com/store/status/monitor" + # r = requests.get(url,headers={"uid":self.key}) + # plans = json.loads(r.text) + # meta = [item['metadata'] for item in plans if item['status']=='active' ] + + meta = self.plan['metadata'] + + if meta : + self.DELAY = 60* int(meta['delay']) + self.LIMIT = int(meta['limit']) + #dbname = [item['name'] for item in plans if int(item['metadata']['limit']) == self.LIMIT][0] + #self.config['store']['args']['dbname'] = dbname + + else: + self.DELAY = -1 + self.LIMIT = -1 + + #self.filter(meta) + + self.agents = self.filter('agents',meta,self.agents) + self.actors = self.filter('actors',meta,self.actors) + #self.setup(meta) + + def filter_collectors(self,meta) : + """ + remove collectors that are not specified by the plan + Note that the agents (collectors) have already been initialized ? + """ + values = meta['agents'].replace(' ','').split(',') + self.agents = [agent for agent in self.agents if agent.getName() in values] + + + def filter_actors(self,meta): + """ + removes actors that are NOT specified by the subscription plan + Note that the actor have already been instatiated and need initialization + """ + values = meta['actors'].replace(' ','').split('.') + self.actors = [actor for actor in self.actors if actor.getName() in values] + + def filter(self,id,meta,objects): + values = meta[id].replace(' ','').split(',') + return [item for item in objects if item.getName() in values] + def __filter(self,meta) : + scope = [] + lactors= [] + for item in meta : + scope = scope + item['scope'].split(',') + if 'actors' in item : + lactors= lactors + item['actors'].split(',') + self.agents = [agent for agent in self.agents if agent.getName() in scope] + if len(lactors) == 0 : + self.actors = [] + self.actors = [ actor for actor in self.actors if actor.getIdentifier() in lactors] + if len(self.actors) > 0 : + # + # We should configure the actors accordingly and make sure they are operational + # + + conf = {"apps":None} + # + # We need to get the configuration for the apps remotely + # + read_class = self.config['store']['class']['read'] + read_args = self.config['store']['args'] + couchdb = self.factory.instance(type=read_class,args=read_args) + uinfo = couchdb.view('config/apps',key=self.key) + if 'apps' in uinfo : + conf['apps'] = uinfo['apps'] + # + # Threshold will always give a default value + # + info = couchdb.view('config/folders',key=self.key) + threshold = info + conf['folder_threshold'] = threshold + + mailer = None + for actor in self.actors : + id = actor.getIdentifier() + if id == "mailer" : + mailer = actor.Mailer() + if conf[id] is None : + continue + args = conf[id] + actor.init(args) + # + # Initializing the mailer + if mailer is not None and mailer in self.config: + mailer.init(self.config['mailer']) + + return meta + def setup(self,meta) : + conf = {"folders":None,"apps":None} + read_class = self.config['store']['class']['read'] + read_args = self.config['store']['args'] + + + couchdb = self.factory.instance(type=read_class,args=read_args) + args = couchdb.view('config/apps',key=self.key) + if len(args.keys()) > 0 : + self.apply_setup('apps',args) + args = couchdb.view('config/folders',key=self.key) + + if 'folder_size' not in meta : + args['threshold'] = meta['folder_size'] + self.apply_setup('folders',args) + + def apply_setup(self,name,args) : + for actor in self.actors : + if args is not None and actor.getName() == name and len(args.keys()) > 0: + actor.init(args) + + + def __setup(self,meta): + # + # We should configure the actors accordingly and make sure they are operational + # + conf = {"folders":meta['folder_threshold'],"apps":None} + # + # We need to get the configuration for the apps remotely + # + read_class = self.config['store']['class']['read'] + read_args = self.config['store']['args'] + + couchdb = self.factory.instance(type=read_class,args=read_args) + uinfo = couchdb.view('config/apps',key=self.key) + if 'apps' in uinfo : + conf['apps'] = uinfo['apps'] + + for agent in self.agents : + agent.init(conf['apps']) + + mailer = None + for actor in self.actors : + id = actor.getIdentifier() + if id == "mailer" : + mailer = actor.Mailer() + if conf[id] is None : + continue + args = conf[id] + actor.init(args) + # + # Initializing the mailer + if mailer is not None and mailer in self.config: + mailer.init(self.config['mailer']) + + def isvalid(self): + self.update() + return self.DELAY > -1 and self.LIMIT > -1 + def post(self,row) : + """ + This function is designed to take appropriate action if a particular incident has been detected + @param label + @param row data pulled extracted + """ + message = {} + message['action'] = 'reboot' + message['node'] = label + + def callback(self,channel,method,header,stream): + """ + This function enables the manager to be able to receive messages and delegate them to the appropriate actor + @channel + """ + print [channel,header] + message = json.loads(stream) + data = message['data'] + + def run(self): + #DELAY=35*60 #- 35 Minutes + #LIMIT=1000 + COUNT = 0 + COUNT_STOP = int(24*60/ self.DELAY) + write_class = self.config['store']['class']['write'] + read_args = self.config['store']['args'] + while True : + COUNT += 1 + if COUNT > COUNT_STOP : + if self.isvalid() : + COUNT = 0 + else: + break + for agent in self.agents : + data = agent.composite() + label = agent.getName() + node = '@'.join([label,self.id]) + row = {} + if label == 'folders': + row = [ dict({"id":self.id}, **_row) for _row in data] + + else: + #label = id + row = data + if type(row)==list and len(row) == 0 : + continue + + # + # + index = self.agents.index(agent) + + if len(self.actors) > index and self.actors[index].getName() == agent.getName() : + actor = self.actors[index] + print actor.analyze(row) + + # self.lock.acquire() + store = self.factory.instance(type=write_class,args=read_args) + store.flush(size=self.LIMIT) + store.write(label=node,row=row) + # self.lock.release() + time.sleep(self.DELAY) + diff --git a/src/utils/transport.py b/src/utils/transport.py new file mode 100755 index 0000000..f56569e --- /dev/null +++ b/src/utils/transport.py @@ -0,0 +1,709 @@ +""" + This file implements data transport stuctures in order to allow data to be moved to and from anywhere + We can thus read data from disk and write to the cloud,queue, or couchdb or SQL +""" +from flask import request, session +import os +import pika +import json +import numpy as np +from couchdbkit import Server +import re +from csv import reader +from datetime import datetime +""" + @TODO: Write a process by which the class automatically handles reading and creating a preliminary sample and discovers the meta data +""" +class Reader: + def __init__(self): + self.nrows = 0 + self.xchar = None + + def row_count(self): + content = self.read() + return np.sum([1 for row in content]) + """ + This function determines the most common delimiter from a subset of possible delimiters. It uses a statistical approach to guage the distribution of columns for a given delimiter + """ + def delimiter(self,sample): + + m = {',':[],'\t':[],'|':[],'\x3A':[]} + delim = m.keys() + for row in sample: + for xchar in delim: + if row.split(xchar) > 1: + m[xchar].append(len(row.split(xchar))) + else: + m[xchar].append(0) + + + + # + # The delimiter with the smallest variance, provided the mean is greater than 1 + # This would be troublesome if there many broken records sampled + # + m = {id: np.var(m[id]) for id in m.keys() if m[id] != [] and int(np.mean(m[id]))>1} + index = m.values().index( min(m.values())) + xchar = m.keys()[index] + + return xchar + """ + This function determines the number of columns of a given sample + @pre self.xchar is not None + """ + def col_count(self,sample): + + m = {} + i = 0 + + for row in sample: + row = self.format(row) + id = str(len(row)) + #id = str(len(row.split(self.xchar))) + + if id not in m: + m[id] = 0 + m[id] = m[id] + 1 + + index = m.values().index( max(m.values()) ) + ncols = int(m.keys()[index]) + + + return ncols; + """ + This function will clean records of a given row by removing non-ascii characters + @pre self.xchar is not None + """ + def format (self,row): + + if isinstance(row,list) == False: + # + # We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary) + cols = self.split(row) + #cols = row.split(self.xchar) + else: + cols = row ; + return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols] + + #if isinstance(row,list) == False: + # return (self.xchar.join(r)).format('utf-8') + #else: + # return r + """ + This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes. + @pre : self.xchar is not None + """ + def split (self,row): + + pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"]) + return re.findall(pattern,row.replace('\n','')) + +class Writer: + + def format(self,row,xchar): + if xchar is not None and isinstance(row,list): + return xchar.join(row)+'\n' + elif xchar is None and isinstance(row,dict): + row = json.dumps(row) + return row + """ + It is important to be able to archive data so as to insure that growth is controlled + Nothing in nature grows indefinitely neither should data being handled. + """ + def archive(self): + pass + def flush(self): + pass + +""" + This class is designed to read data from an Http request file handler provided to us by flask + The file will be heald in memory and processed accordingly + NOTE: This is inefficient and can crash a micro-instance (becareful) +""" +class HttpRequestReader(Reader): + def __init__(self,**params): + self.file_length = 0 + try: + + #self.file = params['file'] + #self.file.seek(0, os.SEEK_END) + #self.file_length = self.file.tell() + + #print 'size of file ',self.file_length + self.content = params['file'].readlines() + self.file_length = len(self.content) + except Exception, e: + print "Error ... ",e + pass + + def isready(self): + return self.file_length > 0 + def read(self,size =-1): + i = 1 + for row in self.content: + i += 1 + if size == i: + break + yield row + +""" + This class is designed to write data to a session/cookie +""" +class HttpSessionWriter(Writer): + """ + @param key required session key + """ + def __init__(self,**params): + self.session = params['queue'] + self.session['sql'] = [] + self.session['csv'] = [] + self.tablename = re.sub('..+$','',params['filename']) + self.session['uid'] = params['uid'] + #self.xchar = params['xchar'] + + + def format_sql(self,row): + values = "','".join([col.replace('"','').replace("'",'') for col in row]) + return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename) + def isready(self): + return True + def write(self,**params): + label = params['label'] + row = params ['row'] + + if label == 'usable': + self.session['csv'].append(self.format(row,',')) + self.session['sql'].append(self.format_sql(row)) + +""" + This class is designed to read data from disk (location on hard drive) + @pre : isready() == True +""" +class DiskReader(Reader) : + """ + @param path absolute path of the file to be read + """ + def __init__(self,**params): + Reader.__init__(self) + self.path = params['path'] ; + + def isready(self): + return os.path.exists(self.path) + """ + This function reads the rows from a designated location on disk + @param size number of rows to be read, -1 suggests all rows + """ + def read(self,size=-1): + f = open(self.path,'rU') + i = 1 + for row in f: + + i += 1 + if size == i: + break + yield row + f.close() +""" + This function writes output to disk in a designated location +""" +class DiskWriter(Writer): + def __init__(self,**params): + if 'path' in params: + self.path = params['path'] + else: + self.path = None + if 'name' in params: + self.name = params['name']; + else: + self.name = None + if os.path.exists(self.path) == False: + os.mkdir(self.path) + """ + This function determines if the class is ready for execution or not + i.e it determines if the preconditions of met prior execution + """ + def isready(self): + + p = self.path is not None and os.path.exists(self.path) + q = self.name is not None + return p and q + """ + This function writes a record to a designated file + @param label + @param row row to be written + """ + def write(self,**params): + label = params['label'] + row = params['row'] + xchar = None + if 'xchar' is not None: + xchar = params['xchar'] + path = ''.join([self.path,os.sep,label]) + if os.path.exists(path) == False: + os.mkdir(path) ; + path = ''.join([path,os.sep,self.name]) + f = open(path,'a') + row = self.format(row,xchar); + f.write(row) + f.close() +""" + This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq) +""" +class MessageQueue: + def __init__(self,**params): + self.host= params['host'] + self.uid = params['uid'] + self.qid = params['qid'] + + def isready(self): + #self.init() + resp = self.connection is not None and self.connection.is_open + self.close() + return resp + def close(self): + if self.connection.is_closed == False : + self.channel.close() + self.connection.close() +""" + This class is designed to publish content to an AMQP (Rabbitmq) + The class will rely on pika to implement this functionality + + We will publish information to a given queue for a given exchange +""" + +class QueueWriter(MessageQueue,Writer): + def __init__(self,**params): + #self.host= params['host'] + #self.uid = params['uid'] + #self.qid = params['queue'] + MessageQueue.__init__(self,**params); + + + def init(self,label=None): + properties = pika.ConnectionParameters(host=self.host) + self.connection = pika.BlockingConnection(properties) + self.channel = self.connection.channel() + self.info = self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True) + if label is None: + self.qhandler = self.channel.queue_declare(queue=self.qid,durable=True) + else: + self.qhandler = self.channel.queue_declare(queue=label,durable=True) + + self.channel.queue_bind(exchange=self.uid,queue=self.qhandler.method.queue) + + + + """ + This function writes a stream of data to the a given queue + @param object object to be written (will be converted to JSON) + @TODO: make this less chatty + """ + def write(self,**params): + xchar = None + if 'xchar' in params: + xchar = params['xchar'] + object = self.format(params['row'],xchar) + + label = params['label'] + self.init(label) + _mode = 2 + if isinstance(object,str): + stream = object + _type = 'text/plain' + else: + stream = json.dumps(object) + if 'type' in params : + _type = params['type'] + else: + _type = 'application/json' + + self.channel.basic_publish( + exchange=self.uid, + routing_key=label, + body=stream, + properties=pika.BasicProperties(content_type=_type,delivery_mode=_mode) + ); + self.close() + + def flush(self,label): + self.init(label) + _mode = 1 #-- Non persistent + self.channel.queue_delete( queue=label); + self.close() + +""" + This class will read from a queue provided an exchange, queue and host + @TODO: Account for security and virtualhosts +""" +class QueueReader(MessageQueue,Reader): + """ + @param host host + @param uid exchange identifier + @param qid queue identifier + """ + def __init__(self,**params): + #self.host= params['host'] + #self.uid = params['uid'] + #self.qid = params['qid'] + MessageQueue.__init__(self,**params); + if 'durable' in params : + self.durable = True + else: + self.durable = False + self.size = -1 + self.data = {} + def init(self,qid): + + properties = pika.ConnectionParameters(host=self.host) + self.connection = pika.BlockingConnection(properties) + self.channel = self.connection.channel() + self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True) + + self.info = self.channel.queue_declare(queue=qid,durable=True) + + + + """ + This is the callback function designed to process the data stream from the queue + + """ + def callback(self,channel,method,header,stream): + + r = [] + if re.match("^\{|\[",stream) is not None: + r = json.loads(stream) + else: + + r = stream + + qid = self.info.method.queue + if qid not in self.data : + self.data[qid] = [] + + self.data[qid].append(r) + # + # We stop reading when the all the messages of the queue are staked + # + if self.size == len(self.data[qid]) or len(self.data[qid]) == self.info.method.message_count: + self.close() + + """ + This function will read, the first message from a queue + @TODO: + Implement channel.basic_get in order to retrieve a single message at a time + Have the number of messages retrieved be specified by size (parameter) + """ + def read(self,size=-1): + r = {} + self.size = size + # + # We enabled the reader to be able to read from several queues (sequentially for now) + # The qid parameter will be an array of queues the reader will be reading from + # + if isinstance(self.qid,basestring) : + self.qid = [self.qid] + for qid in self.qid: + self.init(qid) + # r[qid] = [] + + if self.info.method.message_count > 0: + + self.channel.basic_consume(self.callback,queue=qid,no_ack=False); + self.channel.start_consuming() + else: + + pass + #self.close() + # r[qid].append( self.data) + + return self.data +class QueueListener(QueueReader): + def init(self,qid): + properties = pika.ConnectionParameters(host=self.host) + self.connection = pika.BlockingConnection(properties) + self.channel = self.connection.channel() + self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True ) + + self.info = self.channel.queue_declare(passive=True,exclusive=True,queue=qid) + + self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid) + #self.callback = callback + def read(self): + + self.init(self.qid) + self.channel.basic_consume(self.callback,queue=self.qid,no_ack=True); + self.channel.start_consuming() + +""" + This class is designed to write output as sql insert statements + The class will inherit from DiskWriter with minor adjustments + @TODO: Include script to create the table if need be using the upper bound of a learner +""" +class SQLDiskWriter(DiskWriter): + def __init__(self,**args): + DiskWriter.__init__(self,**args) + self.tablename = re.sub('\..+$','',self.name).replace(' ','_') + """ + @param label + @param row + @param xchar + """ + def write(self,**args): + label = args['label'] + row = args['row'] + + if label == 'usable': + values = "','".join([col.replace('"','').replace("'",'') for col in row]) + row = "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename) + + args['row'] = row + DiskWriter.write(self,**args) +class Couchdb: + """ + @param uri host & port reference + @param uid user id involved + + @param dbname database name (target) + """ + def __init__(self,**args): + uri = args['uri'] + self.uid = args['uid'] + dbname = args['dbname'] + self.server = Server(uri=uri) + self.dbase = self.server.get_db(dbname) + if self.dbase.doc_exist(self.uid) == False: + self.dbase.save_doc({"_id":self.uid}) + """ + Insuring the preconditions are met for processing + """ + def isready(self): + p = self.server.info() != {} + if p == False or self.dbase.dbname not in self.server.all_dbs(): + return False + # + # At this point we are sure that the server is connected + # We are also sure that the database actually exists + # + q = self.dbase.doc_exist(self.uid) + if q == False: + return False + return True + def view(self,id,**args): + r =self.dbase.view(id,**args) + r = r.all() + return r[0]['value'] if len(r) > 0 else [] + +""" + This function will read an attachment from couchdb and return it to calling code. The attachment must have been placed before hand (otherwise oops) + @T: Account for security & access control +""" +class CouchdbReader(Couchdb,Reader): + """ + @param filename filename (attachment) + """ + def __init__(self,**args): + # + # setting the basic parameters for + Couchdb.__init__(self,**args) + if 'filename' in args : + self.filename = args['filename'] + else: + self.filename = None + + def isready(self): + # + # Is the basic information about the database valid + # + p = Couchdb.isready(self) + + if p == False: + return False + # + # The database name is set and correct at this point + # We insure the document of the given user has the requested attachment. + # + + doc = self.dbase.get(self.uid) + + if '_attachments' in doc: + r = self.filename in doc['_attachments'].keys() + + else: + r = False + + return r + def stream(self): + content = self.dbase.fetch_attachment(self.uid,self.filename).split('\n') ; + i = 1 + for row in content: + yield row + if size > 0 and i == size: + break + i = i + 1 + + def read(self,size=-1): + if self.filename is not None: + self.stream() + else: + return self.basic_read() + def basic_read(self): + document = self.dbase.get(self.uid) + del document['_id'], document['_rev'] + return document +""" + This class will write on a couchdb document provided a scope + The scope is the attribute that will be on the couchdb document +""" +class CouchdbWriter(Couchdb,Writer): + """ + @param uri host & port reference + @param uid user id involved + @param filename filename (attachment) + @param dbname database name (target) + """ + def __init__(self,**args): + + Couchdb.__init__(self,**args) + uri = args['uri'] + self.uid = args['uid'] + if 'filename' in args: + self.filename = args['filename'] + else: + self.filename = None + dbname = args['dbname'] + self.server = Server(uri=uri) + self.dbase = self.server.get_db(dbname) + # + # If the document doesn't exist then we should create it + # + + """ + write a given attribute to a document database + @param label scope of the row repair|broken|fixed|stats + @param row row to be written + """ + def write(self,**params): + + document = self.dbase.get(self.uid) + label = params['label'] + + + if 'row' in params : + row = params['row'] + row_is_list = isinstance(row,list) + if label not in document : + document[label] = row if row_is_list else [row] + + elif isinstance(document[label][0],list) : + document[label].append(row) + else: + document[label] += row + else : + if label not in document : + document[label] = {} + if isinstance(params['data'],object) : + + document[label] = dict(document[label],**params['data']) + else: + document[label] = params['data'] + + # if label not in document : + # document[label] = [] if isinstance(row,list) else {} + # if isinstance(document[label],list): + # document[label].append(row) + # else : + # document[label] = dict(document[label],**row) + self.dbase.save_doc(document) + def flush(self,**params) : + + size = params['size'] if 'size' in params else 0 + has_changed = False + document = self.dbase.get(self.uid) + for key in document: + if key not in ['_id','_rev','_attachments'] : + content = document[key] + else: + continue + if isinstance(content,list) and size > 0: + index = len(content) - size + content = content[index:] + document[key] = content + + else: + document[key] = {} + has_changed = True + + self.dbase.save_doc(document) + + def archive(self,params=None): + document = self.dbase.get(self.uid) + content = {} + _doc = {} + for id in document: + if id in ['_id','_rev','_attachments'] : + _doc[id] = document[id] + else: + content[id] = document[id] + + content = json.dumps(content) + document= _doc + now = str(datetime.today()) + + name = '-'.join([document['_id'] , now,'.json']) + self.dbase.save_doc(document) + self.dbase.put_attachment(document,content,name,'application/json') +""" + This class acts as a factory to be able to generate an instance of a Reader/Writer + Against a Queue,Disk,Cloud,Couchdb + The class doesn't enforce parameter validation, thus any error with the parameters sent will result in a null Object +""" +class DataSourceFactory: + def instance(self,**args): + source = args['type'] + params = args['args'] + anObject = None + + if source in ['HttpRequestReader','HttpSessionWriter']: + # + # @TODO: Make sure objects are serializable, be smart about them !! + # + aClassName = ''.join([source,'(**params)']) + + + else: + + stream = json.dumps(params) + aClassName = ''.join([source,'(**',stream,')']) + try: + + + anObject = eval( aClassName) + #setattr(anObject,'name',source) + except Exception,e: + print ['Error ',e] + return anObject +""" + This class implements a data-source handler that is intended to be used within the context of data processing, it allows to read/write anywhere transparently. + The class is a facade to a heterogeneous class hierarchy and thus simplifies how the calling code interacts with the class hierarchy +""" +class DataSource: + def __init__(self,sourceType='Disk',outputType='Disk',params={}): + self.Input = DataSourceFactory.instance(type=sourceType,args=params) + self.Output= DataSourceFactory.instance(type=outputType,args=params) + def read(self,size=-1): + return self.Input.read(size) + def write(self,**args): + self.Output.write(**args) +#p = {} +#p['host'] = 'dev.the-phi.com' +#p['uid'] = 'nyemba@gmail.com' +#p['qid'] = 'repair' +#factory = DataSourceFactory() +#o = factory.instance(type='QueueReader',args=p) +#print o is None +#q = QueueWriter(host='dev.the-phi.com',uid='nyemba@gmail.com') +#q.write(object='steve') +#q.write(object='nyemba') +#q.write(object='elon') + +