|
|
|
"""
|
|
|
|
Data Transport - 1.0
|
|
|
|
Steve L. Nyemba, The Phi Technology LLC
|
|
|
|
|
|
|
|
This file is a wrapper around mongodb for reading/writing content against a mongodb server and executing views (mapreduce)
|
|
|
|
"""
|
|
|
|
from pymongo import MongoClient
|
|
|
|
import bson
|
|
|
|
from bson.objectid import ObjectId
|
|
|
|
from bson.binary import Binary
|
|
|
|
# import nujson as json
|
|
|
|
from datetime import datetime
|
|
|
|
import pandas as pd
|
|
|
|
import numpy as np
|
|
|
|
self.client.close()
|
|
|
|
def meta(self,**_args):
|
|
|
|
return []
|
|
|
|
class Reader(Mongo):
|
|
|
|
"""
|
|
|
|
This class will read from a mongodb data store and return the content of a document (not a collection)
|
|
|
|
"""
|
|
|
|
def __init__(self,**args):
|
|
|
|
Mongo.__init__(self,**args)
|
|
|
|
def read(self,**args):
|
|
|
|
|
|
|
|
if 'mongo' in args or 'cmd' in args or 'pipeline' in args:
|
|
|
|
#
|
|
|
|
# @TODO:
|
|
|
|
cmd = {}
|
|
|
|
if 'aggregate' not in cmd and 'aggregate' not in args:
|
|
|
|
cmd['aggregate'] = self.collection
|
|
|
|
elif 'aggregate' in args :
|
|
|
|
cmd['aggregate'] = args['aggregate']
|
|
|
|
if 'pipeline' in args :
|
|
|
|
cmd['pipeline']= args['pipeline']
|
|
|
|
|
|
|
|
if 'pipeline' not in args or 'aggregate' not in cmd :
|
|
|
|
cmd = args['mongo'] if 'mongo' in args else args['cmd']
|
|
|
|
if "aggregate" in cmd :
|
|
|
|
if "allowDiskUse" not in cmd :
|
|
|
|
cmd["allowDiskUse"] = True
|
|
|
|
if "cursor" not in cmd :
|
|
|
|
cmd["cursor"] = {}
|
|
|
|
r = []
|
|
|
|
out = self.db.command(cmd)
|
|
|
|
#@TODO: consider using a yield (generator) works wonders
|
|
|
|
while True :
|
|
|
|
if 'values' in out :
|
|
|
|
r += out['values']
|
|
|
|
if 'cursor' in out :
|
|
|
|
key = 'firstBatch' if 'firstBatch' in out['cursor'] else 'nextBatch'
|
|
|
|
else:
|
|
|
|
key = 'n'
|
|
|
|
if 'cursor' in out and out['cursor'][key] :
|
|
|
|
r += list(out['cursor'][key])
|
|
|
|
elif key in out and out[key]:
|
|
|
|
r.append (out[key])
|
|
|
|
# yield out['cursor'][key]
|
|
|
|
if key not in ['firstBatch','nextBatch'] or ('cursor' in out and out['cursor']['id'] == 0) :
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
out = self.db.command({"getMore":out['cursor']['id'],"collection":out['cursor']['ns'].split(".")[-1]})
|
|
|
|
|
|
|
|
|
|
|
|
return pd.DataFrame(r)
|
|
|
|
else:
|
|
|
|
|
|
|
|
|
|
|
|
if 'table' in args or 'collection' in args :
|
|
|
|
if 'table' in args:
|
|
|
|
_uid = args['table']
|
|
|
|
elif 'collection' in args :
|
|
|
|
_uid = args['collection']
|
|
|
|
else:
|
|
|
|
_uid = self.collection
|
|
|
|
else:
|
|
|
|
_uid = self.collection
|
|
|
|
collection = self.db[_uid]
|
|
|
|
_filter = args['filter'] if 'filter' in args else {}
|
|
|
|
_df = pd.DataFrame(collection.find(_filter))
|
|
|
|
columns = _df.columns.tolist()[1:]
|
|
|
|
return _df[columns]
|
|
|
|
def view(self,**args):
|
|
|
|
"""
|
|
|
|
This function is designed to execute a view (map/reduce) operation
|
|
|
|
"""
|
|
|
|
pass
|
|
|
|
class Writer(Mongo):
|
|
|
|
"""
|
|
|
|
This class is designed to write to a mongodb collection within a database
|
|
|
|
"""
|
|
|
|
def __init__(self,**args):
|
|
|
|
Mongo.__init__(self,**args)
|
|
|
|
def upload(self,**args) :
|
|
|
|
"""
|
|
|
|
This function will upload a file to the current database (using GridFS)
|
|
|
|
:param data binary stream/text to be stored
|
|
|
|
:param filename filename to be used
|
|
|
|
:param encoding content_encoding (default utf-8)
|
|
|
|
|
|
|
|
"""
|
|
|
|
if 'encoding' not in args :
|
|
|
|
args['encoding'] = 'utf-8'
|
|
|
|
gfs = GridFS(self.db)
|
|
|
|
gfs.put(**args)
|
|
|
|
|
|
|
|
def archive(self):
|
|
|
|
"""
|
|
|
|
This function will archive documents to the
|
|
|
|
"""
|
|
|
|
collection = self.db[self.collection]
|
|
|
|
rows = list(collection.find())
|
|
|
|
for row in rows :
|
|
|
|
if type(row['_id']) == ObjectId :
|
|
|
|
row['_id'] = str(row['_id'])
|
|
|
|
stream = Binary(json.dumps(collection,cls=IEncoder).encode())
|
|
|
|
collection.delete_many({})
|
|
|
|
now = "-".join([str(datetime.now().year()),str(datetime.now().month), str(datetime.now().day)])
|
|
|
|
name = ".".join([self.collection,'archive',now])+".json"
|
|
|
|
description = " ".join([self.collection,'archive',str(len(rows))])
|
|
|
|
self.upload(filename=name,data=stream,description=description,content_type='application/json')
|
|
|
|
# gfs = GridFS(self.db)
|
|
|
|
# gfs.put(filename=name,description=description,data=stream,encoding='utf-8')
|
|
|
|
# self.write({{"filename":name,"file":stream,"description":descriptions}})
|
|
|
|
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
def write(self,info,**_args):
|
|
|
|
"""
|
|
|
|
This function will write to a given collection i.e add a record to a collection (no updates)
|
|
|
|
@param info new record in the collection to be added
|
|
|
|
"""
|
|
|
|
# document = self.db[self.collection].find()
|
|
|
|
#collection = self.db[self.collection]
|
|
|
|
# if type(info) == list :
|
|
|
|
# self.db[self.collection].insert_many(info)
|
|
|
|
# else:
|
|
|
|
try:
|
|
|
|
if 'table' in _args or 'collection' in _args :
|
|
|
|
_uid = _args['table'] if 'table' in _args else _args['collection']
|
|
|
|
else:
|
|
|
|
_uid = self.collection if 'doc' not in _args else _args['doc']
|
|
|
|
if self._lock :
|
|
|
|
Mongo.lock.acquire()
|
|
|
|
if type(info) == list or type(info) == pd.DataFrame :
|
|
|
|
if type(info) == pd.DataFrame :
|
|
|
|
info = info.to_dict(orient='records')
|
|
|
|
# info if type(info) == list else info.to_dict(orient='records')
|