From 5660d8ba593f34a677759b575b58436dfef8a53f Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Sat, 11 Nov 2023 10:30:58 -0600 Subject: [PATCH] nextcloud handling --- transport/disk.py | 2 +- transport/nextcloud.py | 76 ++++++++++++++++++++++++++++++++++++++++++ transport/providers.py | 18 ++++++---- transport/version.py | 2 +- 4 files changed, 90 insertions(+), 8 deletions(-) create mode 100644 transport/nextcloud.py diff --git a/transport/disk.py b/transport/disk.py index d8ee757..f092a3d 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -64,7 +64,7 @@ class DiskWriter(Writer): def __init__(self,**params): super().__init__() self._path = params['path'] - self._delimiter = params['delimiter'] + self._delimiter = params['delimiter'] if 'delimiter' in params else None self._mode = 'w' if 'mode' not in params else params['mode'] # def meta(self): # return self.cache['meta'] diff --git a/transport/nextcloud.py b/transport/nextcloud.py new file mode 100644 index 0000000..457eb83 --- /dev/null +++ b/transport/nextcloud.py @@ -0,0 +1,76 @@ +""" +We are implementing transport to and from nextcloud (just like s3) +""" +import os +import sys +from transport.common import Reader,Writer +import pandas as pd +from io import StringIO +import json +import nextcloud_client as nextcloud + +class Nextcloud : + def __init__(self,**_args): + pass + self._delimiter = None + self._handler = nextcloud.Client(_args['url']) + _uid = _args['uid'] + _token = _args['token'] + self._uri = _args['folder'] if 'folder' in _args else './' + if self._uri.endswith('/') : + self._uri = self._uri[:-1] + self._file = None if 'file' not in _args else _args['file'] + self._handler.login(_uid,_token) + def close(self): + try: + self._handler.logout() + except Exception as e: + pass + + +class NextcloudReader(Nextcloud,Reader): + def __init__(self,**_args): + # self._file = [] if 'file' not in _args else _args['file'] + super().__init__(**_args) + pass + def read(self,**_args): + _filename = self._file if 'file' not in _args else _args['file'] + # + # @TODO: if _filename is none, an exception should be raised + # + _uri = '/'.join([self._uri,_filename]) + if self._handler.get_file(_uri) : + # + # + _info = self._handler.file_info(_uri) + _content = self._handler.get_file_contents(_uri).decode('utf8') + if _info.get_content_type() == 'text/csv' : + _file = StringIO(_content) + return pd.read_csv(_file) + else: + return _content + return None +class NextcloudWriter (Nextcloud,Writer): + """ + This class will write data to an instance of nextcloud + """ + def __init__(self,**_args) : + super().__init__(**_args) + self + def write(self,_data,**_args): + """ + This function will upload a file to a given destination + :file has the uri of the location of the file + """ + _filename = self._file if 'file' not in _args else _args['file'] + _uri = '/'.join([self._uri,_filename]) + if type(_data) == pd.DataFrame : + f = StringIO() + _data.to_csv(f,index=False) + _content = f.getvalue() + elif type(_data) == dict : + _content = json.dumps(_data) + else: + _content = str(_data) + self._handler.put_file_contents(_uri,_content) + diff --git a/transport/providers.py b/transport/providers.py index fc394f3..a798960 100644 --- a/transport/providers.py +++ b/transport/providers.py @@ -10,6 +10,7 @@ from transport import etl as etl from transport import qlistener from transport import bricks from transport import session +from transport import nextcloud import psycopg2 as pg import mysql.connector as my from google.cloud import bigquery as bq @@ -34,7 +35,7 @@ MARIADB = 'mariadb' COUCHDB = 'couch' CONSOLE = 'console' ETL = 'etl' - +NEXTCLOUD = 'nextcloud' # # synonyms of the above @@ -49,18 +50,19 @@ AWS_S3 = 's3' RABBIT = RABBITMQ QLISTENER = 'qlistener' +QUEUE = QLISTENER DATABRICKS= 'databricks+connector' DRIVERS = {PG:pg,REDSHIFT:pg,MYSQL:my,MARIADB:my,NETEZZA:nz,SQLITE:sqlite3} -CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[BIGQUERY,DATABRICKS],'file':[FILE], - 'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QLISTENER],'http':[HTTP]} +CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[NEXTCLOUD,S3,BIGQUERY,DATABRICKS],'file':[FILE], + 'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QUEUE],'http':[HTTP]} READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader}, - 'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader}, + 'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader,NEXTCLOUD:nextcloud.NextcloudReader}, 'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener}, # 'cli':{CONSOLE:Console},'memory':{CONSOLE:Console},'http':session.HttpReader } WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter}, - 'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter}, + 'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter,NEXTCLOUD:nextcloud.NextcloudWriter}, 'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener}, # 'cli':{CONSOLE:Console}, # 'memory':{CONSOLE:Console}, 'http':session.HttpReader @@ -78,12 +80,16 @@ PROVIDERS = { MYSQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}}, MARIADB:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}}, + S3:{'read':s3.s3Reader,'write':s3.s3Writer}, BIGQUERY:{'read':sql.BigQueryReader,'write':sql.BigQueryWriter}, + DATABRICKS:{'read':bricks.BricksReader,'write':bricks.BricksWriter}, + NEXTCLOUD:{'read':nextcloud.NextcloudReader,'write':nextcloud.NextcloudWriter}, + QLISTENER:{'read':qlistener.qListener,'write':qlistener.qListener,'default':{'host':'localhost','port':5672}}, CONSOLE:{'read':qlistener.Console,"write":qlistener.Console}, HTTP:{'read':session.HttpReader,'write':session.HttpWriter}, - DATABRICKS:{'read':bricks.BricksReader,'write':bricks.BricksWriter}, + MONGODB:{'read':mongo.MongoReader,'write':mongo.MongoWriter,'default':{'port':27017,'host':'localhost'}}, COUCHDB:{'read':couch.CouchReader,'writer':couch.CouchWriter,'default':{'host':'localhost','port':5984}}, ETL :{'read':etl.Transporter,'write':etl.Transporter} diff --git a/transport/version.py b/transport/version.py index 5e7e7b7..3fa6e8d 100644 --- a/transport/version.py +++ b/transport/version.py @@ -1,2 +1,2 @@ __author__ = 'The Phi Technology' -__version__= '1.9.0' +__version__= '1.9.2'