From 3e77175d06fc4630965f4206dd25aa98b281f4d3 Mon Sep 17 00:00:00 2001 From: "Steve L. Nyemba" Date: Mon, 25 Sep 2017 21:55:34 -0500 Subject: [PATCH 1/3] TR - added s3 handling (reader) --- transport.py | 96 +++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 83 insertions(+), 13 deletions(-) diff --git a/transport.py b/transport.py index 8ed1370..ea13bff 100644 --- a/transport.py +++ b/transport.py @@ -11,6 +11,9 @@ from couchdbkit import Server import re from csv import reader from datetime import datetime +import boto +import botocore +from smart_open import smart_open """ @TODO: Write a process by which the class automatically handles reading and creating a preliminary sample and discovers the meta data """ @@ -629,12 +632,81 @@ class CouchdbWriter(Couchdb,Writer): name = '-'.join([document['_id'] , now,'.json']) self.dbase.save_doc(document) self.dbase.put_attachment(document,content,name,'application/json') +class s3 : + """ + @TODO: Implement a search function for a file given a bucket?? + """ + def __init__(self,args) : + """ + This function will extract a file or set of files from s3 bucket provided + @param access_key + @param secret_key + @param path location of the file + @param filter filename or filtering elements + """ + try: + self.s3 = boto.connect_s3(args['access_key'],args['secret_key']) + self.bucket = self.s3.get_bucket(args['bucket']) if 'bucket' in args else None + # self.path = args['path'] + self.filter = args['filter'] if 'filter' in args else None + self.filename = args['file'] if 'file' in args else None + + except Exception as e : + self.s3 = None + self.bucket = None + print e + def buckets(self): + """ + This function is a wrapper around the bucket list of buckets for s3 + + """ + return self.s3.get_all_buckets() + + +class s3Reader(s3,Reader) : + """ + Because s3 contains buckets and files, reading becomes a tricky proposition : + - list files if file is None + - stream content if file is Not None + @TODO: support read from all buckets, think about it + """ + def __init__(self,args) : + s3.__init__(self,args) + def files(self): + r = [] + try: + return [item.name for item in self.bucket if item.size > 0] + except Exception as e: + pass + return r + def stream(self,limit=-1): + """ + At this point we should stream a file from a given bucket + """ + key = self.bucket.get_key(self.filename.strip()) + if key is None : + yield None + else: + count = 0 + with smart_open(key) as remote_file: + for line in remote_file: + if count == limit and limit > 0 : + break + yield line + count += 1 + def read(self,limit=-1) : + if self.filename is None : + # + # returning the list of files because no one file was specified. + return self.files() + else: + return self.stream(10) """ This class acts as a factory to be able to generate an instance of a Reader/Writer Against a Queue,Disk,Cloud,Couchdb The class doesn't enforce parameter validation, thus any error with the parameters sent will result in a null Object """ -class DataSourceFactory: +class Factory: def instance(self,**args): source = args['type'] params = args['args'] @@ -659,6 +731,10 @@ class DataSourceFactory: except Exception,e: print ['Error ',e] return anObject +class s3Writer(s3,Writer) : + def __init__(self,args) : + s3.__init__(self,args) + """ This class implements a data-source handler that is intended to be used within the context of data processing, it allows to read/write anywhere transparently. The class is a facade to a heterogeneous class hierarchy and thus simplifies how the calling code interacts with the class hierarchy @@ -671,16 +747,10 @@ class DataSource: return self.Input.read(size) def write(self,**args): self.Output.write(**args) -#p = {} -#p['host'] = 'dev.the-phi.com' -#p['uid'] = 'nyemba@gmail.com' -#p['qid'] = 'repair' -#factory = DataSourceFactory() -#o = factory.instance(type='QueueReader',args=p) -#print o is None -#q = QueueWriter(host='dev.the-phi.com',uid='nyemba@gmail.com') -#q.write(object='steve') -#q.write(object='nyemba') -#q.write(object='elon') - +# conf = json.loads(open('conf.json').read()) +# x = s3Reader( dict(conf,**{'bucket':'com.phi.sample.data','file':'Sample-Spreadsheet-5000-rows.csv'})) +# r = x.read() +# for item in r : +# print item +#print buckets[1].get_key('Sample-Spreadsheet-5000-rows.csv') From cf6f017b91f6db995f124d7b8e1e9c0ccace23e9 Mon Sep 17 00:00:00 2001 From: "Steve L. Nyemba" Date: Tue, 26 Sep 2017 15:54:26 -0500 Subject: [PATCH 2/3] S3 - Bug fix? Not sure --- transport.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) mode change 100644 => 100755 transport.py diff --git a/transport.py b/transport.py old mode 100644 new mode 100755 index ea13bff..c6bd085 --- a/transport.py +++ b/transport.py @@ -646,7 +646,7 @@ class s3 : """ try: self.s3 = boto.connect_s3(args['access_key'],args['secret_key']) - self.bucket = self.s3.get_bucket(args['bucket']) if 'bucket' in args else None + self.bucket = self.s3.get_bucket(args['bucket'].strip(),validate=False) if 'bucket' in args else None # self.path = args['path'] self.filter = args['filter'] if 'filter' in args else None self.filename = args['file'] if 'file' in args else None @@ -747,9 +747,11 @@ class DataSource: return self.Input.read(size) def write(self,**args): self.Output.write(**args) -# conf = json.loads(open('conf.json').read()) -# x = s3Reader( dict(conf,**{'bucket':'com.phi.sample.data','file':'Sample-Spreadsheet-5000-rows.csv'})) - +conf = json.loads(open('config.json').read()) +#x = s3Reader( dict(conf,**{'bucket':'com.phi.sample.data','file':'Sample-Spreadsheet-5000-rows.csv'})) +x = s3Reader(conf) +print conf +print x.bucket.get_all_keys() # r = x.read() # for item in r : # print item From 8d4ecd7a9f31fa72a08fbb68f28cdfee7bd8ced2 Mon Sep 17 00:00:00 2001 From: "Steve L. Nyemba" Date: Tue, 26 Sep 2017 16:10:14 -0500 Subject: [PATCH 3/3] S3 Requirments file --- requirements.txt | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4e72ea4 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,35 @@ +asn1crypto==0.23.0 +boto==2.48.0 +boto3==1.4.7 +botocore==1.7.17 +bz2file==0.98 +certifi==2017.7.27.1 +cffi==1.11.0 +chardet==3.0.4 +click==6.7 +couchdbkit==0.6.5 +cryptography==2.0.3 +docutils==0.14 +enum34==1.1.6 +Flask==0.12.2 +futures==3.1.1 +http-parser==0.8.3 +idna==2.6 +ipaddress==1.0.18 +itsdangerous==0.24 +Jinja2==2.9.6 +jmespath==0.9.3 +MarkupSafe==1.0 +numpy==1.13.1 +pika==0.11.0 +pycparser==2.18 +pyOpenSSL==17.3.0 +python-dateutil==2.6.1 +requests==2.18.4 +restkit==4.2.2 +s3transfer==0.1.11 +six==1.11.0 +smart-open==1.5.3 +socketpool==0.5.3 +urllib3==1.22 +Werkzeug==0.12.2