You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
data-transport/transport/nosql/mongodb.py

262 lines
10 KiB
Python

"""
Data Transport - 1.0
Steve L. Nyemba, The Phi Technology LLC
This file is a wrapper around mongodb for reading/writing content against a mongodb server and executing views (mapreduce)
"""
from pymongo import MongoClient
import bson
from bson.objectid import ObjectId
from bson.binary import Binary
# import nujson as json
from datetime import datetime
import pandas as pd
import numpy as np
self.client.close()
def meta(self,**_args):
return []
class Reader(Mongo):
"""
This class will read from a mongodb data store and return the content of a document (not a collection)
"""
def __init__(self,**args):
Mongo.__init__(self,**args)
def read(self,**args):
if 'mongo' in args or 'cmd' in args or 'pipeline' in args:
#
# @TODO:
cmd = {}
if 'aggregate' not in cmd and 'aggregate' not in args:
cmd['aggregate'] = self.collection
elif 'aggregate' in args :
cmd['aggregate'] = args['aggregate']
if 'pipeline' in args :
cmd['pipeline']= args['pipeline']
if 'pipeline' not in args or 'aggregate' not in cmd :
cmd = args['mongo'] if 'mongo' in args else args['cmd']
if "aggregate" in cmd :
if "allowDiskUse" not in cmd :
cmd["allowDiskUse"] = True
if "cursor" not in cmd :
cmd["cursor"] = {}
r = []
out = self.db.command(cmd)
#@TODO: consider using a yield (generator) works wonders
while True :
if 'values' in out :
r += out['values']
if 'cursor' in out :
key = 'firstBatch' if 'firstBatch' in out['cursor'] else 'nextBatch'
else:
key = 'n'
if 'cursor' in out and out['cursor'][key] :
r += list(out['cursor'][key])
elif key in out and out[key]:
r.append (out[key])
# yield out['cursor'][key]
if key not in ['firstBatch','nextBatch'] or ('cursor' in out and out['cursor']['id'] == 0) :
break
else:
out = self.db.command({"getMore":out['cursor']['id'],"collection":out['cursor']['ns'].split(".")[-1]})
return pd.DataFrame(r)
else:
if 'table' in args or 'collection' in args :
if 'table' in args:
_uid = args['table']
elif 'collection' in args :
_uid = args['collection']
else:
_uid = self.collection
else:
_uid = self.collection
collection = self.db[_uid]
_filter = args['filter'] if 'filter' in args else {}
_df = pd.DataFrame(collection.find(_filter))
columns = _df.columns.tolist()[1:]
return _df[columns]
def view(self,**args):
"""
This function is designed to execute a view (map/reduce) operation
"""
pass
class Writer(Mongo):
"""
This class is designed to write to a mongodb collection within a database
"""
def __init__(self,**args):
Mongo.__init__(self,**args)
def upload(self,**args) :
"""
This function will upload a file to the current database (using GridFS)
:param data binary stream/text to be stored
:param filename filename to be used
:param encoding content_encoding (default utf-8)
"""
if 'encoding' not in args :
args['encoding'] = 'utf-8'
gfs = GridFS(self.db)
gfs.put(**args)
def archive(self):
"""
This function will archive documents to the
"""
collection = self.db[self.collection]
rows = list(collection.find())
for row in rows :
if type(row['_id']) == ObjectId :
row['_id'] = str(row['_id'])
stream = Binary(json.dumps(collection,cls=IEncoder).encode())
collection.delete_many({})
now = "-".join([str(datetime.now().year()),str(datetime.now().month), str(datetime.now().day)])
name = ".".join([self.collection,'archive',now])+".json"
description = " ".join([self.collection,'archive',str(len(rows))])
self.upload(filename=name,data=stream,description=description,content_type='application/json')
# gfs = GridFS(self.db)
# gfs.put(filename=name,description=description,data=stream,encoding='utf-8')
# self.write({{"filename":name,"file":stream,"description":descriptions}})
pass
def write(self,info,**_args):
"""
This function will write to a given collection i.e add a record to a collection (no updates)
@param info new record in the collection to be added
"""
# document = self.db[self.collection].find()
#collection = self.db[self.collection]
# if type(info) == list :
# self.db[self.collection].insert_many(info)
# else:
try:
if 'table' in _args or 'collection' in _args :
_uid = _args['table'] if 'table' in _args else _args['collection']
else:
_uid = self.collection if 'doc' not in _args else _args['doc']
if self._lock :
Mongo.lock.acquire()
if type(info) == list or type(info) == pd.DataFrame :
if type(info) == pd.DataFrame :
info = info.to_dict(orient='records')
# info if type(info) == list else info.to_dict(orient='records')