diff --git a/transport.py b/transport.py index 8ed1370..ea13bff 100644 --- a/transport.py +++ b/transport.py @@ -11,6 +11,9 @@ from couchdbkit import Server import re from csv import reader from datetime import datetime +import boto +import botocore +from smart_open import smart_open """ @TODO: Write a process by which the class automatically handles reading and creating a preliminary sample and discovers the meta data """ @@ -629,12 +632,81 @@ class CouchdbWriter(Couchdb,Writer): name = '-'.join([document['_id'] , now,'.json']) self.dbase.save_doc(document) self.dbase.put_attachment(document,content,name,'application/json') +class s3 : + """ + @TODO: Implement a search function for a file given a bucket?? + """ + def __init__(self,args) : + """ + This function will extract a file or set of files from s3 bucket provided + @param access_key + @param secret_key + @param path location of the file + @param filter filename or filtering elements + """ + try: + self.s3 = boto.connect_s3(args['access_key'],args['secret_key']) + self.bucket = self.s3.get_bucket(args['bucket']) if 'bucket' in args else None + # self.path = args['path'] + self.filter = args['filter'] if 'filter' in args else None + self.filename = args['file'] if 'file' in args else None + + except Exception as e : + self.s3 = None + self.bucket = None + print e + def buckets(self): + """ + This function is a wrapper around the bucket list of buckets for s3 + + """ + return self.s3.get_all_buckets() + + +class s3Reader(s3,Reader) : + """ + Because s3 contains buckets and files, reading becomes a tricky proposition : + - list files if file is None + - stream content if file is Not None + @TODO: support read from all buckets, think about it + """ + def __init__(self,args) : + s3.__init__(self,args) + def files(self): + r = [] + try: + return [item.name for item in self.bucket if item.size > 0] + except Exception as e: + pass + return r + def stream(self,limit=-1): + """ + At this point we should stream a file from a given bucket + """ + key = self.bucket.get_key(self.filename.strip()) + if key is None : + yield None + else: + count = 0 + with smart_open(key) as remote_file: + for line in remote_file: + if count == limit and limit > 0 : + break + yield line + count += 1 + def read(self,limit=-1) : + if self.filename is None : + # + # returning the list of files because no one file was specified. + return self.files() + else: + return self.stream(10) """ This class acts as a factory to be able to generate an instance of a Reader/Writer Against a Queue,Disk,Cloud,Couchdb The class doesn't enforce parameter validation, thus any error with the parameters sent will result in a null Object """ -class DataSourceFactory: +class Factory: def instance(self,**args): source = args['type'] params = args['args'] @@ -659,6 +731,10 @@ class DataSourceFactory: except Exception,e: print ['Error ',e] return anObject +class s3Writer(s3,Writer) : + def __init__(self,args) : + s3.__init__(self,args) + """ This class implements a data-source handler that is intended to be used within the context of data processing, it allows to read/write anywhere transparently. The class is a facade to a heterogeneous class hierarchy and thus simplifies how the calling code interacts with the class hierarchy @@ -671,16 +747,10 @@ class DataSource: return self.Input.read(size) def write(self,**args): self.Output.write(**args) -#p = {} -#p['host'] = 'dev.the-phi.com' -#p['uid'] = 'nyemba@gmail.com' -#p['qid'] = 'repair' -#factory = DataSourceFactory() -#o = factory.instance(type='QueueReader',args=p) -#print o is None -#q = QueueWriter(host='dev.the-phi.com',uid='nyemba@gmail.com') -#q.write(object='steve') -#q.write(object='nyemba') -#q.write(object='elon') - +# conf = json.loads(open('conf.json').read()) +# x = s3Reader( dict(conf,**{'bucket':'com.phi.sample.data','file':'Sample-Spreadsheet-5000-rows.csv'})) +# r = x.read() +# for item in r : +# print item +#print buckets[1].get_key('Sample-Spreadsheet-5000-rows.csv')