Adding nextcloud support #5

Merged
steve merged 6 commits from nextcloud into dev 11 months ago

@ -17,7 +17,7 @@ args = {
"license":"MIT", "license":"MIT",
"packages":["transport"]} "packages":["transport"]}
args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite'] args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite']
args["install_requires"] = ['pymongo','sqlalchemy<2.0.0','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] args["install_requires"] = ['pyncclient','pymongo','sqlalchemy<2.0.0','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python']
args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git"
args['scripts'] = ['bin/transport'] args['scripts'] = ['bin/transport']
if sys.version_info[0] == 2 : if sys.version_info[0] == 2 :

@ -64,7 +64,7 @@ class DiskWriter(Writer):
def __init__(self,**params): def __init__(self,**params):
super().__init__() super().__init__()
self._path = params['path'] self._path = params['path']
self._delimiter = params['delimiter'] self._delimiter = params['delimiter'] if 'delimiter' in params else None
self._mode = 'w' if 'mode' not in params else params['mode'] self._mode = 'w' if 'mode' not in params else params['mode']
# def meta(self): # def meta(self):
# return self.cache['meta'] # return self.cache['meta']
@ -209,17 +209,20 @@ class SQLiteWriter(SQLite,DiskWriter) :
""" """
""" """
if not self.fields : #if not self.fields :
if type(info) == pd.DataFrame : # #if type(info) == pd.DataFrame :
_columns = list(info.columns) # # _columns = list(info.columns)
self.init(list(info.keys())) # #self.init(list(info.keys()))
if type(info) == dict : if type(info) == dict :
info = [info] info = [info]
elif type(info) == pd.DataFrame : elif type(info) == pd.DataFrame :
info = info.fillna('') info = info.fillna('')
info = info.to_dict(orient='records') info = info.to_dict(orient='records')
if not self.fields :
_rec = info[0]
self.init(list(_rec.keys()))
SQLiteWriter.LOCK.acquire() SQLiteWriter.LOCK.acquire()
try: try:
@ -238,4 +241,4 @@ class SQLiteWriter(SQLite,DiskWriter) :
except Exception as e : except Exception as e :
print (e) print (e)
pass pass
SQLiteWriter.LOCK.release() SQLiteWriter.LOCK.release()

@ -0,0 +1,80 @@
"""
We are implementing transport to and from nextcloud (just like s3)
"""
import os
import sys
from transport.common import Reader,Writer
import pandas as pd
from io import StringIO
import json
import nextcloud_client as nextcloud
class Nextcloud :
def __init__(self,**_args):
pass
self._delimiter = None
self._handler = nextcloud.Client(_args['url'])
_uid = _args['uid']
_token = _args['token']
self._uri = _args['folder'] if 'folder' in _args else './'
if self._uri.endswith('/') :
self._uri = self._uri[:-1]
self._file = None if 'file' not in _args else _args['file']
self._handler.login(_uid,_token)
def close(self):
try:
self._handler.logout()
except Exception as e:
pass
class NextcloudReader(Nextcloud,Reader):
def __init__(self,**_args):
# self._file = [] if 'file' not in _args else _args['file']
super().__init__(**_args)
pass
def read(self,**_args):
_filename = self._file if 'file' not in _args else _args['file']
#
# @TODO: if _filename is none, an exception should be raised
#
_uri = '/'.join([self._uri,_filename])
if self._handler.get_file(_uri) :
#
#
_info = self._handler.file_info(_uri)
_content = self._handler.get_file_contents(_uri).decode('utf8')
if _info.get_content_type() == 'text/csv' :
#
# @TODO: enable handling of csv, xls, parquet, pickles
_file = StringIO(_content)
return pd.read_csv(_file)
else:
#
# if it is neither a structured document like csv, we will return the content as is
return _content
return None
class NextcloudWriter (Nextcloud,Writer):
"""
This class will write data to an instance of nextcloud
"""
def __init__(self,**_args) :
super().__init__(**_args)
self
def write(self,_data,**_args):
"""
This function will upload a file to a given destination
:file has the uri of the location of the file
"""
_filename = self._file if 'file' not in _args else _args['file']
_uri = '/'.join([self._uri,_filename])
if type(_data) == pd.DataFrame :
f = StringIO()
_data.to_csv(f,index=False)
_content = f.getvalue()
elif type(_data) == dict :
_content = json.dumps(_data)
else:
_content = str(_data)
self._handler.put_file_contents(_uri,_content)

@ -10,6 +10,7 @@ from transport import etl as etl
from transport import qlistener from transport import qlistener
from transport import bricks from transport import bricks
from transport import session from transport import session
from transport import nextcloud
import psycopg2 as pg import psycopg2 as pg
import mysql.connector as my import mysql.connector as my
from google.cloud import bigquery as bq from google.cloud import bigquery as bq
@ -34,7 +35,7 @@ MARIADB = 'mariadb'
COUCHDB = 'couch' COUCHDB = 'couch'
CONSOLE = 'console' CONSOLE = 'console'
ETL = 'etl' ETL = 'etl'
NEXTCLOUD = 'nextcloud'
# #
# synonyms of the above # synonyms of the above
@ -49,18 +50,19 @@ AWS_S3 = 's3'
RABBIT = RABBITMQ RABBIT = RABBITMQ
QLISTENER = 'qlistener' QLISTENER = 'qlistener'
QUEUE = QLISTENER
DATABRICKS= 'databricks+connector' DATABRICKS= 'databricks+connector'
DRIVERS = {PG:pg,REDSHIFT:pg,MYSQL:my,MARIADB:my,NETEZZA:nz,SQLITE:sqlite3} DRIVERS = {PG:pg,REDSHIFT:pg,MYSQL:my,MARIADB:my,NETEZZA:nz,SQLITE:sqlite3}
CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[BIGQUERY,DATABRICKS],'file':[FILE], CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[NEXTCLOUD,S3,BIGQUERY,DATABRICKS],'file':[FILE],
'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QLISTENER],'http':[HTTP]} 'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QUEUE],'http':[HTTP]}
READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader}, READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader},
'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader}, 'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader,NEXTCLOUD:nextcloud.NextcloudReader},
'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener}, 'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener},
# 'cli':{CONSOLE:Console},'memory':{CONSOLE:Console},'http':session.HttpReader # 'cli':{CONSOLE:Console},'memory':{CONSOLE:Console},'http':session.HttpReader
} }
WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter}, WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter},
'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter}, 'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter,NEXTCLOUD:nextcloud.NextcloudWriter},
'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener}, 'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener},
# 'cli':{CONSOLE:Console}, # 'cli':{CONSOLE:Console},
# 'memory':{CONSOLE:Console}, 'http':session.HttpReader # 'memory':{CONSOLE:Console}, 'http':session.HttpReader
@ -70,6 +72,7 @@ WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.Co
PROVIDERS = { PROVIDERS = {
FILE:{'read':disk.DiskReader,'write':disk.DiskWriter}, FILE:{'read':disk.DiskReader,'write':disk.DiskWriter},
SQLITE:{'read':disk.SQLiteReader,'write':disk.SQLiteWriter,'driver':sqlite3}, SQLITE:{'read':disk.SQLiteReader,'write':disk.SQLiteWriter,'driver':sqlite3},
'sqlite3':{'read':disk.SQLiteReader,'write':disk.SQLiteWriter,'driver':sqlite3},
POSTGRESQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':pg,'default':{'host':'localhost','port':5432}}, POSTGRESQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':pg,'default':{'host':'localhost','port':5432}},
NETEZZA:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':nz,'default':{'port':5480}}, NETEZZA:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':nz,'default':{'port':5480}},
@ -78,12 +81,16 @@ PROVIDERS = {
MYSQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}}, MYSQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}},
MARIADB:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}}, MARIADB:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}},
S3:{'read':s3.s3Reader,'write':s3.s3Writer}, S3:{'read':s3.s3Reader,'write':s3.s3Writer},
BIGQUERY:{'read':sql.BigQueryReader,'write':sql.BigQueryWriter}, BIGQUERY:{'read':sql.BigQueryReader,'write':sql.BigQueryWriter},
DATABRICKS:{'read':bricks.BricksReader,'write':bricks.BricksWriter},
NEXTCLOUD:{'read':nextcloud.NextcloudReader,'write':nextcloud.NextcloudWriter},
QLISTENER:{'read':qlistener.qListener,'write':qlistener.qListener,'default':{'host':'localhost','port':5672}}, QLISTENER:{'read':qlistener.qListener,'write':qlistener.qListener,'default':{'host':'localhost','port':5672}},
CONSOLE:{'read':qlistener.Console,"write":qlistener.Console}, CONSOLE:{'read':qlistener.Console,"write":qlistener.Console},
HTTP:{'read':session.HttpReader,'write':session.HttpWriter}, HTTP:{'read':session.HttpReader,'write':session.HttpWriter},
DATABRICKS:{'read':bricks.BricksReader,'write':bricks.BricksWriter},
MONGODB:{'read':mongo.MongoReader,'write':mongo.MongoWriter,'default':{'port':27017,'host':'localhost'}}, MONGODB:{'read':mongo.MongoReader,'write':mongo.MongoWriter,'default':{'port':27017,'host':'localhost'}},
COUCHDB:{'read':couch.CouchReader,'writer':couch.CouchWriter,'default':{'host':'localhost','port':5984}}, COUCHDB:{'read':couch.CouchReader,'writer':couch.CouchWriter,'default':{'host':'localhost','port':5984}},
ETL :{'read':etl.Transporter,'write':etl.Transporter} ETL :{'read':etl.Transporter,'write':etl.Transporter}
@ -92,4 +99,4 @@ DEFAULT = {PG:{'host':'localhost','port':5432},MYSQL:{'host':'localhost','port':
DEFAULT[MONGODB] = {'port':27017,'host':'localhost'} DEFAULT[MONGODB] = {'port':27017,'host':'localhost'}
DEFAULT[REDSHIFT] = DEFAULT[PG] DEFAULT[REDSHIFT] = DEFAULT[PG]
DEFAULT[MARIADB] = DEFAULT[MYSQL] DEFAULT[MARIADB] = DEFAULT[MYSQL]
DEFAULT[NETEZZA] = {'port':5480} DEFAULT[NETEZZA] = {'port':5480}

@ -1,2 +1,2 @@
__author__ = 'The Phi Technology' __author__ = 'The Phi Technology'
__version__= '1.9.0' __version__= '1.9.2'

Loading…
Cancel
Save