diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..acd5d78 --- /dev/null +++ b/setup.py @@ -0,0 +1,19 @@ +""" +This is a build file for the +""" +from setuptools import setup, find_packages + +setup( + name = "data-transport", + version = "1.0", + author = "The Phi Technology LLC", + author_email = "steve@the-phi.com", + license = "MIT", + packages=['transport'], + install_requires = ['pymongo','numpy','cloudant','pika','boto','flask-session','smart_open'], + + use_2to3=True, + convert_2to3_doctests=['src/your/module/README.txt'], + use_2to3_fixers=['your.fixers'], + use_2to3_exclude_fixers=['lib2to3.fixes.fix_import'], + ) diff --git a/transport/__init__.py b/transport/__init__.py new file mode 100644 index 0000000..9b4f540 --- /dev/null +++ b/transport/__init__.py @@ -0,0 +1,209 @@ +""" +Data Transport - 1.0 +Steve L. Nyemba, The Phi Technology LLC + +This module is designed to serve as a wrapper to a set of supported data stores : + - couchdb + - mongodb + - Files (character delimited) + - Queues (RabbmitMq) + - Session (Flask) + - s3 +The supported operations are read/write and providing meta data to the calling code +Requirements : + pymongo + boto + couldant +The configuration for the data-store is as follows : + couchdb: + { + args:{ + url:, + username:, + password:, + dbname:, + uid: + } + } + RabbitMQ: + { + + } + Mongodb: + { + args:{ + host:, #localhost:27017 + username:, + password:, + dbname:, + uid:s + + } + } +""" +__author__ = 'The Phi Technology' +import numpy as np +import json +import importlib +from common import Reader, Writer #, factory +# import disk +# import queue +# import couch +# import mongo +# import s3 +class factory : + @staticmethod + def instance(**args): + """ + This class will create an instance of a transport when providing + :type name of the type we are trying to create + :args The arguments needed to create the instance + """ + source = args['type'] + params = args['args'] + anObject = None + + if source in ['HttpRequestReader','HttpSessionWriter']: + # + # @TODO: Make sure objects are serializable, be smart about them !! + # + aClassName = ''.join([source,'(**params)']) + + + else: + + stream = json.dumps(params) + aClassName = ''.join([source,'(**',stream,')']) + try: + anObject = eval( aClassName) + #setattr(anObject,'name',source) + except Exception,e: + print ['Error ',e] + return anObject + +# class Reader: +# def __init__(self): +# self.nrows = 0 +# self.xchar = None + +# def row_count(self): +# content = self.read() +# return np.sum([1 for row in content]) +# def delimiter(self,sample): +# """ +# This function determines the most common delimiter from a subset of possible delimiters. +# It uses a statistical approach (distribution) to guage the distribution of columns for a given delimiter + +# :sample sample string/content expecting matrix i.e list of rows +# """ + +# m = {',':[],'\t':[],'|':[],'\x3A':[]} +# delim = m.keys() +# for row in sample: +# for xchar in delim: +# if row.split(xchar) > 1: +# m[xchar].append(len(row.split(xchar))) +# else: +# m[xchar].append(0) + + + +# # +# # The delimiter with the smallest variance, provided the mean is greater than 1 +# # This would be troublesome if there many broken records sampled +# # +# m = {id: np.var(m[id]) for id in m.keys() if m[id] != [] and int(np.mean(m[id]))>1} +# index = m.values().index( min(m.values())) +# xchar = m.keys()[index] + +# return xchar +# def col_count(self,sample): +# """ +# This function retirms the number of columns of a given sample +# @pre self.xchar is not None +# """ + +# m = {} +# i = 0 + +# for row in sample: +# row = self.format(row) +# id = str(len(row)) +# #id = str(len(row.split(self.xchar))) + +# if id not in m: +# m[id] = 0 +# m[id] = m[id] + 1 + +# index = m.values().index( max(m.values()) ) +# ncols = int(m.keys()[index]) + + +# return ncols; +# def format (self,row): +# """ +# This function will clean records of a given row by removing non-ascii characters +# @pre self.xchar is not None +# """ + +# if isinstance(row,list) == False: +# # +# # We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary) +# cols = self.split(row) +# #cols = row.split(self.xchar) +# else: +# cols = row ; +# return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols] + +# def split (self,row): +# """ +# This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes. +# @pre : self.xchar is not None +# """ + +# pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"]) +# return re.findall(pattern,row.replace('\n','')) + + +# class Writer: + +# def format(self,row,xchar): +# if xchar is not None and isinstance(row,list): +# return xchar.join(row)+'\n' +# elif xchar is None and isinstance(row,dict): +# row = json.dumps(row) +# return row +# """ +# It is important to be able to archive data so as to insure that growth is controlled +# Nothing in nature grows indefinitely neither should data being handled. +# """ +# def archive(self): +# pass +# def flush(self): +# pass + +# class factory : +# @staticmethod +# def instance(**args): + +# source = args['type'] +# params = args['args'] +# anObject = None + +# if source in ['HttpRequestReader','HttpSessionWriter']: +# # +# # @TODO: Make sure objects are serializable, be smart about them !! +# # +# aClassName = ''.join([source,'(**params)']) + + +# else: + +# stream = json.dumps(params) +# aClassName = ''.join([source,'(**',stream,')']) +# try: +# anObject = eval( aClassName) +# #setattr(anObject,'name',source) +# except Exception,e: +# print ['Error ',e] +# return anObject \ No newline at end of file diff --git a/transport/__init__.pyc b/transport/__init__.pyc new file mode 100644 index 0000000..0311a20 Binary files /dev/null and b/transport/__init__.pyc differ diff --git a/transport/common.py b/transport/common.py new file mode 100644 index 0000000..c55141b --- /dev/null +++ b/transport/common.py @@ -0,0 +1,154 @@ +""" +Data Transport - 1.0 +Steve L. Nyemba, The Phi Technology LLC + +This module is designed to serve as a wrapper to a set of supported data stores : + - couchdb + - mongodb + - Files (character delimited) + - Queues (RabbmitMq) + - Session (Flask) + - s3 +The supported operations are read/write and providing meta data to the calling code +Requirements : + pymongo + boto + couldant + +""" +__author__ = 'The Phi Technology' +import numpy as np +import json +import importlib +# import couch +# import mongo +class Reader: + def __init__(self): + self.nrows = 0 + self.xchar = None + + def row_count(self): + content = self.read() + return np.sum([1 for row in content]) + def delimiter(self,sample): + """ + This function determines the most common delimiter from a subset of possible delimiters. + It uses a statistical approach (distribution) to guage the distribution of columns for a given delimiter + + :sample sample string/content expecting matrix i.e list of rows + """ + + m = {',':[],'\t':[],'|':[],'\x3A':[]} + delim = m.keys() + for row in sample: + for xchar in delim: + if row.split(xchar) > 1: + m[xchar].append(len(row.split(xchar))) + else: + m[xchar].append(0) + + + + # + # The delimiter with the smallest variance, provided the mean is greater than 1 + # This would be troublesome if there many broken records sampled + # + m = {id: np.var(m[id]) for id in m.keys() if m[id] != [] and int(np.mean(m[id]))>1} + index = m.values().index( min(m.values())) + xchar = m.keys()[index] + + return xchar + def col_count(self,sample): + """ + This function retirms the number of columns of a given sample + @pre self.xchar is not None + """ + + m = {} + i = 0 + + for row in sample: + row = self.format(row) + id = str(len(row)) + #id = str(len(row.split(self.xchar))) + + if id not in m: + m[id] = 0 + m[id] = m[id] + 1 + + index = m.values().index( max(m.values()) ) + ncols = int(m.keys()[index]) + + + return ncols; + def format (self,row): + """ + This function will clean records of a given row by removing non-ascii characters + @pre self.xchar is not None + """ + + if isinstance(row,list) == False: + # + # We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary) + cols = self.split(row) + #cols = row.split(self.xchar) + else: + cols = row ; + return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols] + + def split (self,row): + """ + This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes. + @pre : self.xchar is not None + """ + + pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"]) + return re.findall(pattern,row.replace('\n','')) + + +class Writer: + + def format(self,row,xchar): + if xchar is not None and isinstance(row,list): + return xchar.join(row)+'\n' + elif xchar is None and isinstance(row,dict): + row = json.dumps(row) + return row + """ + It is important to be able to archive data so as to insure that growth is controlled + Nothing in nature grows indefinitely neither should data being handled. + """ + def archive(self): + pass + def flush(self): + pass + +# class factory : +# @staticmethod +# def instance(**args): +# """ +# This class will create an instance of a transport when providing +# :type name of the type we are trying to create +# :args The arguments needed to create the instance +# """ +# source = args['type'] +# params = args['args'] +# anObject = None + +# if source in ['HttpRequestReader','HttpSessionWriter']: +# # +# # @TODO: Make sure objects are serializable, be smart about them !! +# # +# aClassName = ''.join([source,'(**params)']) + + +# else: + +# stream = json.dumps(params) +# aClassName = ''.join([source,'(**',stream,')']) +# try: +# anObject = eval( aClassName) +# #setattr(anObject,'name',source) +# except Exception,e: +# print ['Error ',e] +# return anObject \ No newline at end of file diff --git a/transport/common.pyc b/transport/common.pyc new file mode 100644 index 0000000..91a168c Binary files /dev/null and b/transport/common.pyc differ diff --git a/transport/couch.py b/transport/couch.py new file mode 100644 index 0000000..6368a27 --- /dev/null +++ b/transport/couch.py @@ -0,0 +1,199 @@ +""" +Data-Transport +Steve L. Nyemba, The Phi Technology + +This file is a wrapper around couchdb using IBM Cloudant SDK that has an interface to couchdb + +""" +import cloudant +import json +from common import Reader,Writer +class Couch: + """ + @param url host & port reference + @param uid user id involved + + @param dbname database name (target) + """ + def __init__(self,**args): + url = args['url'] + self.uid = args['uid'] + dbname = args['dbname'] + if 'username' not in args and 'password' not in args : + self.server = cloudant.CouchDB(url=url) + else: + self.server = cloudant.CouchDB(args['username'],args['password'],url=url) + self.server.connect() + + if dbname in self.server.all_dbs() : + self.dbase = self.server.get(dbname,dbname,True) + # + # @TODO Check if the database exists ... + # + doc = cloudant.document.Document(self.dbase,self.uid) #self.dbase.get(self.uid) + if not doc.exists(): + doc = self.dbase.create_document({"_id":self.uid}) + doc.save() + else: + self.dbase = None + """ + Insuring the preconditions are met for processing + """ + def isready(self): + p = self.server.metadata() != {} + if p == False or not self.dbase: + return False + # + # At this point we are sure that the server is connected + # We are also sure that the database actually exists + # + doc = cloudant.document.Document(self.dbase,self.uid) + # q = self.dbase.all_docs(key=self.uid)['rows'] + # if not q : + if not doc.exists(): + return False + return True + + def view(self,**args): + """ + We are executing a view + :id design document _design/xxxx (provide full name with _design prefix) + :view_name name of the view i.e + :key key to be used to filter the content + """ + document = cloudant.design_document.DesignDocument(self.dbase,args['id']) + document.fetch() + params = {'group_level':1,'group':True} + if 'key' in args : + params ['key'] = args['key'] + elif 'keys' in args : + params['keys'] = args['keys'] + return document.get_view(args['view_name'])(**params)['rows'] + + + + +class CouchReader(Couch,Reader): + """ + This function will read an attachment from couchdb and return it to calling code. The attachment must have been placed before hand (otherwise oops) + @T: Account for security & access control + """ + def __init__(self,**args): + """ + @param filename filename (attachment) + """ + # + # setting the basic parameters for + Couch.__init__(self,**args) + if 'filename' in args : + self.filename = args['filename'] + else: + self.filename = None + + # def isready(self): + # # + # # Is the basic information about the database valid + # # + # p = Couchdb.isready(self) + + # if p == False: + # return False + # # + # # The database name is set and correct at this point + # # We insure the document of the given user has the requested attachment. + # # + + # doc = self.dbase.get(self.uid) + + # if '_attachments' in doc: + # r = self.filename in doc['_attachments'].keys() + + # else: + # r = False + + # return r + def stream(self): + # + # @TODO Need to get this working ... + # + document = cloudant.document.Document(self.dbase,self.uid) + # content = self.dbase.fetch_attachment(self.uid,self.filename).split('\n') ; + content = self.get_attachment(self.filename) + for row in content: + yield row + + def read(self,size=-1): + if self.filename is not None: + self.stream() + else: + return self.basic_read() + def basic_read(self): + document = cloudant.document.Document(self.dbase,self.uid) + + # document = self.dbase.get(self.uid) + if document.exists() : + document.fetch() + document = dict(document) + del document['_rev'] + else: + document = {} + return document + +class CouchWriter(Couch,Writer): + """ + This class will write on a couchdb document provided a scope + The scope is the attribute that will be on the couchdb document + """ + def __init__(self,**args): + """ + @param uri host & port reference + @param uid user id involved + @param filename filename (attachment) + @param dbname database name (target) + """ + + Couch.__init__(self,**args) + + def write(self,**params): + """ + write a given attribute to a document database + @param label scope of the row repair|broken|fixed|stats + @param row row to be written + """ + + # document = self.dbase.get(self.uid) + document = cloudant.document.Document(self.dbase,self.uid) #.get(self.uid) + if document.exists() is False : + document = self.dbase.create_document({"_id":self.uid}) + label = params['label'] + row = params['row'] + if label not in document : + document[label] = [] + document[label].append(row) + document.save() + # self.dbase.bulk_docs([document]) + # self.dbase.save_doc(document) + + def archive(self,params=None): + """ + This function will archive the document onto itself. + """ + # document = self.dbase.all_docs(self.uid,include_docs=True) + document = cloudant.document.Document(self.dbase,self.filename) + document.fetch() + content = {} + # _doc = {} + for id in document: + if id not in ['_id','_rev','_attachments'] : + content[id] = document[id] + del document[id] + + content = json.dumps(content) + # document= _doc + now = str(datetime.today()) + + name = '-'.join([document['_id'] , now,'.json']) + # self.dbase.bulk_docs([document]) + # self.dbase.put_attachment(document,content,name,'application/json') + document.put_attachment(self.dbase,name,'application/json',content) + document.save() diff --git a/transport/couch.pyc b/transport/couch.pyc new file mode 100644 index 0000000..e74fec2 Binary files /dev/null and b/transport/couch.pyc differ diff --git a/transport/couchdb.pyc b/transport/couchdb.pyc new file mode 100644 index 0000000..2639714 Binary files /dev/null and b/transport/couchdb.pyc differ diff --git a/transport/disk.py b/transport/disk.py new file mode 100644 index 0000000..5186698 --- /dev/null +++ b/transport/disk.py @@ -0,0 +1,82 @@ +import os +from .__init__ import Reader,Writer +import json + +class DiskReader(Reader) : + """ + This class is designed to read data from disk (location on hard drive) + @pre : isready() == True + """ + + def __init__(self,**params): + """ + @param path absolute path of the file to be read + """ + + Reader.__init__(self) + self.path = params['path'] ; + + def isready(self): + return os.path.exists(self.path) + def read(self,size=-1): + """ + This function reads the rows from a designated location on disk + @param size number of rows to be read, -1 suggests all rows + """ + + f = open(self.path,'rU') + i = 1 + for row in f: + + i += 1 + if size == i: + break + yield row + f.close() +class DiskWriter(Writer): + """ + This function writes output to disk in a designated location + """ + + def __init__(self,**params): + if 'path' in params: + self.path = params['path'] + else: + self.path = None + if 'name' in params: + self.name = params['name']; + else: + self.name = None + if os.path.exists(self.path) == False: + os.mkdir(self.path) + + def isready(self): + """ + This function determines if the class is ready for execution or not + i.e it determines if the preconditions of met prior execution + """ + + p = self.path is not None and os.path.exists(self.path) + q = self.name is not None + return p and q + def write(self,**params): + """ + This function writes a record to a designated file + @param label + @param row row to be written + """ + + label = params['label'] + row = params['row'] + xchar = None + if 'xchar' is not None: + xchar = params['xchar'] + path = ''.join([self.path,os.sep,label]) + if os.path.exists(path) == False: + os.mkdir(path) ; + path = ''.join([path,os.sep,self.name]) + f = open(path,'a') + row = self.format(row,xchar); + f.write(row) + f.close() + \ No newline at end of file diff --git a/transport/disk.pyc b/transport/disk.pyc new file mode 100644 index 0000000..3c23d49 Binary files /dev/null and b/transport/disk.pyc differ diff --git a/transport/mongo.py b/transport/mongo.py new file mode 100644 index 0000000..5c11e56 --- /dev/null +++ b/transport/mongo.py @@ -0,0 +1,66 @@ +from pymongo import MongoClient +# from transport import Reader,Writer +from common import Reader, Writer +import json +class Mongo : + """ + Basic mongodb functions are captured here + """ + def __init__(self,**args): + """ + :dbname database name/identifier + :host host and port of the database + :username username for authentication + :password password for current user + """ + host = args['host'] + + if 'user' in args and 'password' in args: + self.client = MongoClient(host, + username=args['username'] , + password=args['password'] , + authMechanism='SCRAM-SHA-256') + else: + self.client = MongoClient() + + self.uid = args['uid'] #-- document identifier + self.dbname = args['dbname'] + self.db = self.client[self.dbname] + + def isready(self): + p = self.dbname in self.client.list_database_names() + q = self.uid in self.client[self.dbname].list_collection_names() + return p and q + +class MongoReader(Mongo,Reader): + """ + This class will read from a mongodb data store and return the content of a document (not a collection) + """ + def __init__(self,**args): + Mongo.__init__(self,**args) + def read(self,size=-1): + collection = self.db[self.uid] + return collection.find({}) + def view(self,**args): + """ + This function is designed to execute a view (map/reduce) operation + """ + pass +class MongoWriter(Mongo,Writer): + """ + This class is designed to write to a mongodb collection within a database + """ + def __init__(self,**args): + Mongo.__init__(self,**args) + def write(self,**args): + # document = self.db[self.uid].find() + collection = self.db[self.uid] + collection.update_one() + self.db[self.uid].insert_one(args['row']) + def set(self,document): + collection = self.db[self.uid] + if collection.count_document() > 0 : + collection.delete({_id:self.uid}) + + collecton.update_one({"_id":self.uid},document,True) + diff --git a/transport/mongo.pyc b/transport/mongo.pyc new file mode 100644 index 0000000..0f52768 Binary files /dev/null and b/transport/mongo.pyc differ diff --git a/transport/queue.py b/transport/queue.py new file mode 100644 index 0000000..846f591 --- /dev/null +++ b/transport/queue.py @@ -0,0 +1,200 @@ +import pika +from datetime import datetime +import re +import json +import os +from common import Reader, Writer +import json + +class MessageQueue: + """ + This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq) + :host + :uid identifier of the exchange + :qid identifier of the queue + """ + def __init__(self,**params): + self.host= params['host'] + self.uid = params['uid'] + self.qid = params['qid'] + + def isready(self): + #self.init() + resp = self.connection is not None and self.connection.is_open + self.close() + return resp + def close(self): + if self.connection.is_closed == False : + self.channel.close() + self.connection.close() + +class QueueWriter(MessageQueue,Writer): + """ + This class is designed to publish content to an AMQP (Rabbitmq) + The class will rely on pika to implement this functionality + + We will publish information to a given queue for a given exchange + """ + def __init__(self,**params): + #self.host= params['host'] + #self.uid = params['uid'] + #self.qid = params['queue'] + MessageQueue.__init__(self,**params); + + + def init(self,label=None): + properties = pika.ConnectionParameters(host=self.host) + self.connection = pika.BlockingConnection(properties) + self.channel = self.connection.channel() + self.info = self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True) + if label is None: + self.qhandler = self.channel.queue_declare(queue=self.qid,durable=True) + else: + self.qhandler = self.channel.queue_declare(queue=label,durable=True) + + self.channel.queue_bind(exchange=self.uid,queue=self.qhandler.method.queue) + + + + """ + This function writes a stream of data to the a given queue + @param object object to be written (will be converted to JSON) + @TODO: make this less chatty + """ + def write(self,**params): + xchar = None + if 'xchar' in params: + xchar = params['xchar'] + object = self.format(params['row'],xchar) + + label = params['label'] + self.init(label) + _mode = 2 + if isinstance(object,str): + stream = object + _type = 'text/plain' + else: + stream = json.dumps(object) + if 'type' in params : + _type = params['type'] + else: + _type = 'application/json' + + self.channel.basic_publish( + exchange=self.uid, + routing_key=label, + body=stream, + properties=pika.BasicProperties(content_type=_type,delivery_mode=_mode) + ); + self.close() + + def flush(self,label): + self.init(label) + _mode = 1 #-- Non persistent + self.channel.queue_delete( queue=label); + self.close() + +class QueueReader(MessageQueue,Reader): + """ + This class will read from a queue provided an exchange, queue and host + @TODO: Account for security and virtualhosts + """ + + def __init__(self,**params): + """ + @param host host + @param uid exchange identifier + @param qid queue identifier + """ + + #self.host= params['host'] + #self.uid = params['uid'] + #self.qid = params['qid'] + MessageQueue.__init__(self,**params); + if 'durable' in params : + self.durable = True + else: + self.durable = False + self.size = -1 + self.data = {} + def init(self,qid): + + properties = pika.ConnectionParameters(host=self.host) + self.connection = pika.BlockingConnection(properties) + self.channel = self.connection.channel() + self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True) + + self.info = self.channel.queue_declare(queue=qid,durable=True) + + + def callback(self,channel,method,header,stream): + """ + This is the callback function designed to process the data stream from the queue + + """ + + r = [] + if re.match("^\{|\[",stream) is not None: + r = json.loads(stream) + else: + + r = stream + + qid = self.info.method.queue + if qid not in self.data : + self.data[qid] = [] + + self.data[qid].append(r) + # + # We stop reading when the all the messages of the queue are staked + # + if self.size == len(self.data[qid]) or len(self.data[qid]) == self.info.method.message_count: + self.close() + + def read(self,size=-1): + """ + This function will read, the first message from a queue + @TODO: + Implement channel.basic_get in order to retrieve a single message at a time + Have the number of messages retrieved be specified by size (parameter) + """ + r = {} + self.size = size + # + # We enabled the reader to be able to read from several queues (sequentially for now) + # The qid parameter will be an array of queues the reader will be reading from + # + if isinstance(self.qid,basestring) : + self.qid = [self.qid] + for qid in self.qid: + self.init(qid) + # r[qid] = [] + + if self.info.method.message_count > 0: + + self.channel.basic_consume(self.callback,queue=qid,no_ack=False); + self.channel.start_consuming() + else: + + pass + #self.close() + # r[qid].append( self.data) + + return self.data +class QueueListener(QueueReader): + def init(self,qid): + properties = pika.ConnectionParameters(host=self.host) + self.connection = pika.BlockingConnection(properties) + self.channel = self.connection.channel() + self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True ) + + self.info = self.channel.queue_declare(passive=True,exclusive=True,queue=qid) + + self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid) + #self.callback = callback + def read(self): + + self.init(self.qid) + self.channel.basic_consume(self.callback,queue=self.qid,no_ack=True); + self.channel.start_consuming() + \ No newline at end of file diff --git a/transport/queue.pyc b/transport/queue.pyc new file mode 100644 index 0000000..281d8a0 Binary files /dev/null and b/transport/queue.pyc differ diff --git a/transport/s3.py b/transport/s3.py new file mode 100644 index 0000000..9b117db --- /dev/null +++ b/transport/s3.py @@ -0,0 +1,83 @@ +from datetime import datetime +import boto +import botocore +from smart_open import smart_open +from common import Reader, Writer +import json +from common import Reader, Writer + +class s3 : + """ + @TODO: Implement a search function for a file given a bucket?? + """ + def __init__(self,args) : + """ + This function will extract a file or set of files from s3 bucket provided + @param access_key + @param secret_key + @param path location of the file + @param filter filename or filtering elements + """ + try: + self.s3 = boto.connect_s3(args['access_key'],args['secret_key']) + self.bucket = self.s3.get_bucket(args['bucket'].strip(),validate=False) if 'bucket' in args else None + # self.path = args['path'] + self.filter = args['filter'] if 'filter' in args else None + self.filename = args['file'] if 'file' in args else None + + except Exception as e : + self.s3 = None + self.bucket = None + print (e) + + def buckets(self): + # def buckets(self): + pass + # """ + # This function is a wrapper around the bucket list of buckets for s3 + # """ + # return self.s3.get_all_buckets() + + +class s3Reader(s3,Reader) : + """ + Because s3 contains buckets and files, reading becomes a tricky proposition : + - list files if file is None + - stream content if file is Not None + @TODO: support read from all buckets, think about it + """ + def __init__(self,args) : + s3.__init__(self,args) + def files(self): + r = [] + try: + return [item.name for item in self.bucket if item.size > 0] + except Exception as e: + pass + return r + def stream(self,limit=-1): + """ + At this point we should stream a file from a given bucket + """ + key = self.bucket.get_key(self.filename.strip()) + if key is None : + yield None + else: + count = 0 + with smart_open(key) as remote_file: + for line in remote_file: + if count == limit and limit > 0 : + break + yield line + count += 1 + def read(self,limit=-1) : + if self.filename is None : + # + # returning the list of files because no one file was specified. + return self.files() + else: + return self.stream(10) + +class s3Writer(s3,Writer) : + def __init__(self,args) : + s3.__init__(self,args) diff --git a/transport/s3.pyc b/transport/s3.pyc new file mode 100644 index 0000000..baa8a00 Binary files /dev/null and b/transport/s3.pyc differ diff --git a/transport/session.py b/transport/session.py new file mode 100644 index 0000000..5ca833a --- /dev/null +++ b/transport/session.py @@ -0,0 +1,66 @@ +from flask import request, session +from datetime import datetime +import re +from common import Reader, Writer +import json + +class HttpRequestReader(Reader): + """ + This class is designed to read data from an Http request file handler provided to us by flask + The file will be heald in memory and processed accordingly + NOTE: This is inefficient and can crash a micro-instance (becareful) + """ + + def __init__(self,**params): + self.file_length = 0 + try: + + #self.file = params['file'] + #self.file.seek(0, os.SEEK_END) + #self.file_length = self.file.tell() + + #print 'size of file ',self.file_length + self.content = params['file'].readlines() + self.file_length = len(self.content) + except Exception, e: + print "Error ... ",e + pass + + def isready(self): + return self.file_length > 0 + def read(self,size =-1): + i = 1 + for row in self.content: + i += 1 + if size == i: + break + yield row + +class HttpSessionWriter(Writer): + """ + This class is designed to write data to a session/cookie + """ + def __init__(self,**params): + """ + @param key required session key + """ + self.session = params['queue'] + self.session['sql'] = [] + self.session['csv'] = [] + self.tablename = re.sub('..+$','',params['filename']) + self.session['uid'] = params['uid'] + #self.xchar = params['xchar'] + + + def format_sql(self,row): + values = "','".join([col.replace('"','').replace("'",'') for col in row]) + return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename) + def isready(self): + return True + def write(self,**params): + label = params['label'] + row = params ['row'] + + if label == 'usable': + self.session['csv'].append(self.format(row,',')) + self.session['sql'].append(self.format_sql(row))