diff --git a/setup.py b/setup.py index fbcde4c..5768b66 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,12 @@ import os import sys def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() -args = {"name":"data-transport","version":"1.0.0","author":"The Phi Technology LLC","author_email":"info@the-phi.com","license":"MIT","packages":["transport"]} +args = { + "name":"data-transport", + "version":"1.0.8", + "author":"The Phi Technology LLC","author_email":"info@the-phi.com", + "license":"MIT", + "packages":["transport"]} args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3'] args["install_requires"] = ['pymongo','numpy','cloudant','pika','boto','flask-session','smart_open'] args["url"] = "https://dev.the-phi.com/git/steve/data-transport.git" diff --git a/transport/common.py b/transport/common.py index e49fbdb..0ad7fd4 100644 --- a/transport/common.py +++ b/transport/common.py @@ -14,7 +14,8 @@ Requirements : pymongo boto couldant - +@TODO: + Enable read/writing to multiple reads/writes """ __author__ = 'The Phi Technology' import numpy as np @@ -22,107 +23,72 @@ import json import importlib # import couch # import mongo -class Reader: - def __init__(self): - self.nrows = 0 - self.xchar = None - - def row_count(self): - content = self.read() - return np.sum([1 for row in content]) - def delimiter(self,sample): +class IO: + def init(self,**args): """ - This function determines the most common delimiter from a subset of possible delimiters. - It uses a statistical approach (distribution) to guage the distribution of columns for a given delimiter - - :sample sample string/content expecting matrix i.e list of rows + This function enables attributes to be changed at runtime. Only the attributes defined in the class can be changed + Adding attributes will require sub-classing otherwise we may have an unpredictable class ... """ - - m = {',':[],'\t':[],'|':[],'\x3A':[]} - delim = list(m.keys()) - for row in sample: - for xchar in delim: - if row.split(xchar) > 1: - m[xchar].append(len(row.split(xchar))) - else: - m[xchar].append(0) - - - - # - # The delimiter with the smallest variance, provided the mean is greater than 1 - # This would be troublesome if there many broken records sampled - # - m = {id: np.var(m[id]) for id in list(m.keys()) if m[id] != [] and int(np.mean(m[id]))>1} - index = list(m.values()).index( min(m.values())) - xchar = list(m.keys())[index] - - return xchar - def col_count(self,sample): - """ - This function retirms the number of columns of a given sample - @pre self.xchar is not None + allowed = list(vars(self).keys()) + for field in args : + if field not in allowed : + continue + value = args[field] + setattr(self,field,value) +class Reader (IO): + """ + This class is an abstraction of a read functionalities of a data store + """ + def __init__(self): + pass + def meta(self): """ - - m = {} - i = 0 - - for row in sample: - row = self.format(row) - id = str(len(row)) - #id = str(len(row.split(self.xchar))) - - if id not in m: - m[id] = 0 - m[id] = m[id] + 1 - - index = list(m.values()).index( max(m.values()) ) - ncols = int(list(m.keys())[index]) - - - return ncols; - def format (self,row): + This function is intended to return meta-data associated with what has just been read + @return object of meta data information associated with the content of the store """ - This function will clean records of a given row by removing non-ascii characters - @pre self.xchar is not None + raise Exception ("meta function needs to be implemented") + def read(**args): """ - - if isinstance(row,list) == False: - # - # We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary) - cols = self.split(row) - #cols = row.split(self.xchar) - else: - cols = row ; - return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols] - - def split (self,row): + This function is intended to read the content of a store provided parameters to be used at the discretion of the subclass """ - This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes. - @pre : self.xchar is not None - """ - - pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"]) - return re.findall(pattern,row.replace('\n','')) + raise Exception ("read function needs to be implemented") -class Writer: - +class Writer(IO): + def __init__(self): + self.cache = {"default":[]} + def log(self,**args): + self.cache[id] = args + def meta (self,id="default",**args): + raise Exception ("meta function needs to be implemented") def format(self,row,xchar): if xchar is not None and isinstance(row,list): return xchar.join(row)+'\n' elif xchar is None and isinstance(row,dict): row = json.dumps(row) return row - """ + def write(self,**args): + """ + This function will write content to a store given parameters to be used at the discretion of the sub-class + """ + raise Exception ("write function needs to be implemented") + + def archive(self): + """ It is important to be able to archive data so as to insure that growth is controlled Nothing in nature grows indefinitely neither should data being handled. - """ - def archive(self): - pass - def flush(self): + """ + raise Exception ("archive function needs to be implemented") + def close(self): + """ + This function will close the persistent storage connection/handler + """ pass - +class ReadWriter(Reader,Writer) : + """ + This class implements the read/write functions aggregated + """ + pass # class factory : # @staticmethod # def instance(**args): diff --git a/transport/couch.py b/transport/couch.py index 7aaf93e..9e9bf93 100644 --- a/transport/couch.py +++ b/transport/couch.py @@ -15,13 +15,13 @@ else: class Couch: """ This class is a wrapper for read/write against couchdb. The class captures common operations for read/write. - @param url host & port reference + @param url host & port reference default http://localhost:5984 @param doc user id involved @param dbname database name (target) """ def __init__(self,**args): - url = args['url'] - self.uid = args['doc'] + url = args['url'] if 'url' in args else 'http://localhost:5984' + self._id = args['doc'] dbname = args['dbname'] if 'username' not in args and 'password' not in args : self.server = cloudant.CouchDB(None,None,url=url) @@ -34,9 +34,9 @@ class Couch: # # @TODO Check if the database exists ... # - doc = cloudant.document.Document(self.dbase,self.uid) #self.dbase.get(self.uid) + doc = cloudant.document.Document(self.dbase,self._id) #self.dbase.get(self._id) if not doc.exists(): - doc = self.dbase.create_document({"_id":self.uid}) + doc = self.dbase.create_document({"_id":self._id}) doc.save() else: self.dbase = None @@ -51,8 +51,8 @@ class Couch: # At this point we are sure that the server is connected # We are also sure that the database actually exists # - doc = cloudant.document.Document(self.dbase,self.uid) - # q = self.dbase.all_docs(key=self.uid)['rows'] + doc = cloudant.document.Document(self.dbase,self._id) + # q = self.dbase.all_docs(key=self._id)['rows'] # if not q : if not doc.exists(): return False @@ -107,7 +107,7 @@ class CouchReader(Couch,Reader): # # We insure the document of the given user has the requested attachment. # # - # doc = self.dbase.get(self.uid) + # doc = self.dbase.get(self._id) # if '_attachments' in doc: # r = self.filename in doc['_attachments'].keys() @@ -120,8 +120,8 @@ class CouchReader(Couch,Reader): # # @TODO Need to get this working ... # - document = cloudant.document.Document(self.dbase,self.uid) - # content = self.dbase.fetch_attachment(self.uid,self.filename).split('\n') ; + document = cloudant.document.Document(self.dbase,self._id) + # content = self.dbase.fetch_attachment(self._id,self.filename).split('\n') ; content = self.get_attachment(self.filename) for row in content: yield row @@ -132,9 +132,9 @@ class CouchReader(Couch,Reader): else: return self.basic_read() def basic_read(self): - document = cloudant.document.Document(self.dbase,self.uid) + document = cloudant.document.Document(self.dbase,self._id) - # document = self.dbase.get(self.uid) + # document = self.dbase.get(self._id) if document.exists() : document.fetch() document = dict(document) @@ -157,32 +157,62 @@ class CouchWriter(Couch,Writer): """ Couch.__init__(self,**args) - - def write(self,**params): + def set (self,info): + document = cloudand.document.Document(self.dbase,self._id) + if document.exists() : + keys = list(set(document.keys()) - set(['_id','_rev','_attachments'])) + for id in keys : + document.field_set(document,id,None) + for id in args : + value = args[id] + document.field_set(document,id,value) + + document.save() + pass + else: + _document = dict({"_id":self._id},**args) + document.create_document(_document) + def write(self,info): """ write a given attribute to a document database - @param label scope of the row repair|broken|fixed|stats - @param row row to be written + @info object to be written to the to an attribute. this """ - # document = self.dbase.get(self.uid) - document = cloudant.document.Document(self.dbase,self.uid) #.get(self.uid) + # document = self.dbase.get(self._id) + document = cloudant.document.Document(self.dbase,self._id) #.get(self._id) if document.exists() is False : - document = self.dbase.create_document({"_id":self.uid}) - label = params['label'] - row = params['row'] - if label not in document : - document[label] = [] - document[label].append(row) + document = self.dbase.create_document({"_id":self._id}) + # label = params['label'] + # row = params['row'] + # if label not in document : + # document[label] = [] + # document[label].append(row) + for key in info : + if key in document and type(document[key]) == list : + document[key] += info[key] + else: + document[key] = info[key] + document.save() # self.dbase.bulk_docs([document]) # self.dbase.save_doc(document) - + + def upload(self,**args): + """ + :param name name of the file to be uploaded + :param data content of the file (binary or text) + :param content_type (default) + """ + mimetype = args['content_type'] if 'content_type' in args else 'text/plain' + document = cloudant.document.Document(self.dbase,self.uid) + document.put_attachment(self.dbase,args['filename'],mimetype,args['content']) + document.save() + def archive(self,params=None): """ This function will archive the document onto itself. """ - # document = self.dbase.all_docs(self.uid,include_docs=True) + # document = self.dbase.all_docs(self._id,include_docs=True) document = cloudant.document.Document(self.dbase,self.filename) document.fetch() content = {} @@ -196,8 +226,9 @@ class CouchWriter(Couch,Writer): # document= _doc now = str(datetime.today()) - name = '-'.join([document['_id'] , now,'.json']) + name = '-'.join([document['_id'] , now,'.json']) + self.upload(filename=name,data=content,content_type='application/json') # self.dbase.bulk_docs([document]) # self.dbase.put_attachment(document,content,name,'application/json') - document.put_attachment(self.dbase,name,'application/json',content) - document.save() + # document.put_attachment(self.dbase,name,'application/json',content) + # document.save() diff --git a/transport/disk.py b/transport/disk.py index be17550..a051045 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -14,8 +14,8 @@ class DiskReader(Reader) : """ Reader.__init__(self) - self.path = params['path'] ; - + self.path = params['path'] ; + self.delimiter = params['delimiter'] if 'delimiter' in params else None def isready(self): return os.path.exists(self.path) def read(self,size=-1): @@ -31,55 +31,54 @@ class DiskReader(Reader) : i += 1 if size == i: break + if self.delimiter : + yield row.split(self.char) yield row f.close() class DiskWriter(Writer): """ - This function writes output to disk in a designated location + This function writes output to disk in a designated location. The function will write a text to a text file + - If a delimiter is provided it will use that to generate a xchar-delimited file + - If not then the object will be dumped as is """ def __init__(self,**params): + Writer.__init__(self) + self.cache['meta'] = {'cols':0,'rows':0,'delimiter':None} if 'path' in params: self.path = params['path'] else: - self.path = None - if 'name' in params: - self.name = params['name']; - else: - self.name = 'out.log' + self.path = 'data-transport.log' + self.delimiter = params['delimiter'] if 'delimiter' in params else None + # if 'name' in params: + # self.name = params['name']; + # else: + # self.name = 'data-transport.log' # if os.path.exists(self.path) == False: # os.mkdir(self.path) - + def meta(self): + return self.cache['meta'] def isready(self): """ This function determines if the class is ready for execution or not i.e it determines if the preconditions of met prior execution """ - - p = self.path is not None and os.path.exists(self.path) - q = self.name is not None - return p and q - def write(self,**params): + return True + # p = self.path is not None and os.path.exists(self.path) + # q = self.name is not None + # return p and q + def format (self,row): + self.cache['meta']['cols'] += len(row) if isinstance(row,list) else len(row.keys()) + self.cache['meta']['rows'] += 1 + return (self.delimiter.join(row) if self.delimiter else json.dumps(row))+"\n" + def write(self,info): """ This function writes a record to a designated file @param label @param row row to be written """ - - # label = params['label'] - row = params['row'] - # xchar = None - # if 'xchar' is not None: - # xchar = params['xchar'] - #path = ''.join([self.path,os.sep,label]) - # path = ''.join([self.path,os.sep,self.name]) - #if os.path.exists(path) == False: - # os.mkdir(path) ; - # path = ''.join([path,os.sep,self.name]) f = open(self.path,'a') - if isinstance(row,object): - row = json.dumps(row) - #row = self.format(row,xchar); - f.write(row+"\n") + f.write(self.format(info)) f.close() + diff --git a/transport/mongo.py b/transport/mongo.py index e363008..ce7165d 100644 --- a/transport/mongo.py +++ b/transport/mongo.py @@ -4,7 +4,12 @@ Steve L. Nyemba, The Phi Technology LLC This file is a wrapper around mongodb for reading/writing content against a mongodb server and executing views (mapreduce) """ -from pymongo import MongoClient +from pymongo import MongoClient +from bson.objectid import ObjectId +from bson.binary import Binary +import json +from datetime import datetime +import gridfs # from transport import Reader,Writer import sys if sys.version_info[0] > 2 : @@ -19,11 +24,11 @@ class Mongo : def __init__(self,**args): """ :dbname database name/identifier - :host host and port of the database + :host host and port of the database by default localhost:27017 :username username for authentication :password password for current user """ - host = args['host'] + host = args['host'] if 'host' in args else 'localhost:27017' if 'user' in args and 'password' in args: self.client = MongoClient(host, @@ -31,7 +36,7 @@ class Mongo : password=args['password'] , authMechanism='SCRAM-SHA-256') else: - self.client = MongoClient() + self.client = MongoClient(host) self.uid = args['doc'] #-- document identifier self.dbname = args['dbname'] @@ -62,17 +67,67 @@ class MongoWriter(Mongo,Writer): """ def __init__(self,**args): Mongo.__init__(self,**args) - def write(self,**args): + def upload(self,**args) : + """ + This function will upload a file to the current database (using GridFS) + :param data binary stream/text to be stored + :param filename filename to be used + :param encoding content_encoding (default utf-8) + + """ + if 'encoding' not in args : + args['encoding'] = 'utf-8' + gfs = GridFS(self.db) + gfs.put(**args) + + def archive(self): + """ + This function will archive documents to the + """ + collection = self.db[self.uid] + rows = list(collection.find()) + for row in rows : + if type(row['_id']) == ObjectId : + row['_id'] = str(row['_id']) + stream = Binary(json.dumps(collection).encode()) + collection.delete_many({}) + now = "-".join([str(datetime.now().year()),str(datetime.now().month), str(datetime.now().day)]) + name = ".".join([self.uid,'archive',now])+".json" + description = " ".join([self.uid,'archive',str(len(rows))]) + self.upload(filename=name,data=stream,description=description,content_type='application/json') + # gfs = GridFS(self.db) + # gfs.put(filename=name,description=description,data=stream,encoding='utf-8') + # self.write({{"filename":name,"file":stream,"description":descriptions}}) + + + pass + def write(self,info): + """ + This function will write to a given collection i.e add a record to a collection (no updates) + @param info new record in the collection to be added + """ # document = self.db[self.uid].find() collection = self.db[self.uid] - if type(args['row']) == list : - self.db[self.uid].insert_many(args['row']) + # if type(info) == list : + # self.db[self.uid].insert_many(info) + # else: + if (type(info) == list) : + self.db[self.uid].insert_many(info) else: - self.db[self.uid].insert_one(args['row']) + self.db[self.uid].insert_one(info) def set(self,document): + """ + if no identifier is provided the function will delete the entire collection and set the new document. + Please use this function with great care (archive the content first before using it... for safety) + """ + collection = self.db[self.uid] - if collection.count_document() > 0 : - collection.delete({_id:self.uid}) - - collecton.update_one({"_id":self.uid},document,True) + if collection.count_document() > 0 and '_id' in document: + id = document['_id'] + del document['_id'] + collection.find_one_and_replace({'_id':id},document) + else: + collection.delete_many({}) + self.write(info) + # collecton.update_one({"_id":self.uid},document,True)