diff --git a/transport/__init__.py b/transport/__init__.py index 2502240..e5a3418 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -50,10 +50,11 @@ import sys if sys.version_info[0] > 2 : from transport.common import Reader, Writer #, factory from transport import disk - from transport import queue as queue + + from transport import s3 as s3 + from transport import rabbitmq as queue from transport import couch as couch from transport import mongo as mongo - from transport import s3 as s3 else: from common import Reader, Writer #, factory import disk diff --git a/transport/common.py b/transport/common.py index 0ad7fd4..6e595ae 100644 --- a/transport/common.py +++ b/transport/common.py @@ -47,7 +47,7 @@ class Reader (IO): @return object of meta data information associated with the content of the store """ raise Exception ("meta function needs to be implemented") - def read(**args): + def read(self,**args): """ This function is intended to read the content of a store provided parameters to be used at the discretion of the subclass """ diff --git a/transport/disk.py b/transport/disk.py index a051045..00c4a87 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -1,5 +1,9 @@ import os -from .__init__ import Reader,Writer +import sys +if sys.version_info[0] > 2 : + from transport.common import Reader, Writer #, factory +else: + from common import Reader,Writer import json class DiskReader(Reader) : @@ -18,12 +22,12 @@ class DiskReader(Reader) : self.delimiter = params['delimiter'] if 'delimiter' in params else None def isready(self): return os.path.exists(self.path) - def read(self,size=-1): + def read(self,**args): """ This function reads the rows from a designated location on disk @param size number of rows to be read, -1 suggests all rows """ - + size = -1 if 'size' not in args else int(args['size']) f = open(self.path,'rU') i = 1 for row in f: diff --git a/transport/mongo.py b/transport/mongo.py index ce7165d..48c1bc8 100644 --- a/transport/mongo.py +++ b/transport/mongo.py @@ -39,7 +39,7 @@ class Mongo : self.client = MongoClient(host) self.uid = args['doc'] #-- document identifier - self.dbname = args['dbname'] + self.dbname = args['dbname'] if 'db' in args else args['db'] self.db = self.client[self.dbname] def isready(self): @@ -53,9 +53,10 @@ class MongoReader(Mongo,Reader): """ def __init__(self,**args): Mongo.__init__(self,**args) - def read(self,size=-1): + def read(self,**args): collection = self.db[self.uid] - return collection.find({}) + _filter = args['filter'] if 'filter' in args else {} + return collection.find(_filter) def view(self,**args): """ This function is designed to execute a view (map/reduce) operation diff --git a/transport/queue.py b/transport/rabbitmq.py similarity index 98% rename from transport/queue.py rename to transport/rabbitmq.py index 485b771..41d016a 100644 --- a/transport/queue.py +++ b/transport/rabbitmq.py @@ -183,7 +183,7 @@ class QueueReader(MessageQueue,Reader): if self.size == len(self.data[qid]) or len(self.data[qid]) == self.info.method.message_count: self.close() - def read(self,size=-1): + def read(self,**args): """ This function will read, the first message from a queue @TODO: @@ -191,7 +191,7 @@ class QueueReader(MessageQueue,Reader): Have the number of messages retrieved be specified by size (parameter) """ r = {} - self.size = size + self.size = -1 if 'size' in args else int(args['size']) # # We enabled the reader to be able to read from several queues (sequentially for now) # The qid parameter will be an array of queues the reader will be reading from diff --git a/transport/s3.py b/transport/s3.py index 19d98b6..1a67317 100644 --- a/transport/s3.py +++ b/transport/s3.py @@ -6,6 +6,8 @@ This file is a wrapper around s3 bucket provided by AWS for reading and writing """ from datetime import datetime import boto +from boto.s3.connection import S3Connection, OrdinaryCallingFormat +import numpy as np import botocore from smart_open import smart_open import sys @@ -14,13 +16,14 @@ if sys.version_info[0] > 2 : else: from common import Reader, Writer import json - +from io import StringIO +import json class s3 : """ @TODO: Implement a search function for a file given a bucket?? """ - def __init__(self,args) : + def __init__(self,**args) : """ This function will extract a file or set of files from s3 bucket provided @param access_key @@ -29,18 +32,39 @@ class s3 : @param filter filename or filtering elements """ try: - self.s3 = boto.connect_s3(args['access_key'],args['secret_key']) + self.s3 = S3Connection(args['access_key'],args['secret_key'],calling_format=OrdinaryCallingFormat()) self.bucket = self.s3.get_bucket(args['bucket'].strip(),validate=False) if 'bucket' in args else None # self.path = args['path'] self.filter = args['filter'] if 'filter' in args else None self.filename = args['file'] if 'file' in args else None + self.bucket_name = args['bucket'] if 'bucket' in args else None except Exception as e : self.s3 = None self.bucket = None print (e) + def meta(self,**args): + """ + :name name of the bucket + """ + info = self.list(**args) + [item.open() for item in info] + return [{"name":item.name,"size":item.size} for item in info] + def list(self,**args): + """ + This function will list the content of a bucket, the bucket must be provided by the name + :name name of the bucket + """ + return list(self.s3.get_bucket(args['name']).list()) + def buckets(self): + # + # This function will return all buckets, not sure why but it should be used cautiously + # based on why the s3 infrastructure is used + # + return [item.name for item in self.s3.get_all_buckets()] + # def buckets(self): pass # """ @@ -56,8 +80,8 @@ class s3Reader(s3,Reader) : - stream content if file is Not None @TODO: support read from all buckets, think about it """ - def __init__(self,args) : - s3.__init__(self,args) + def __init__(self,**args) : + s3.__init__(self,**args) def files(self): r = [] try: @@ -80,14 +104,32 @@ class s3Reader(s3,Reader) : break yield line count += 1 - def read(self,limit=-1) : + def read(self,**args) : if self.filename is None : # # returning the list of files because no one file was specified. return self.files() else: - return self.stream(10) + limit = args['size'] if 'size' in args else -1 + return self.stream(limit) class s3Writer(s3,Writer) : - def __init__(self,args) : - s3.__init__(self,args) + + def __init__(self,args) : + s3.__init__(self,args) + def mkdir(self,name): + """ + This function will create a folder in a bucket + :name name of the folder + """ + self.s3.put_object(Bucket=self.bucket_name,key=(name+'/')) + def write(self,content): + file = StringIO(content.decode("utf8")) + self.s3.upload_fileobj(file,self.bucket_name,self.filename) + pass + +if __name__ == '__main__' : + p = {'access_key':'AKIAJO7KII27XH3TCPJQ','secret_key':'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'} + reader = s3Reader(**p) + buckets = reader.buckets() + print(reader.list(name = buckets[0]))