diff --git a/setup.py b/setup.py index 254bb5c..c322c38 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ args = { "license":"MIT", "packages":["transport"]} args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite'] -args["install_requires"] = ['pymongo','sqlalchemy<2.0.0','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] +args["install_requires"] = ['pyncclient','pymongo','sqlalchemy<2.0.0','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args['scripts'] = ['bin/transport'] if sys.version_info[0] == 2 : diff --git a/transport/disk.py b/transport/disk.py index d8ee757..956386d 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -64,7 +64,7 @@ class DiskWriter(Writer): def __init__(self,**params): super().__init__() self._path = params['path'] - self._delimiter = params['delimiter'] + self._delimiter = params['delimiter'] if 'delimiter' in params else None self._mode = 'w' if 'mode' not in params else params['mode'] # def meta(self): # return self.cache['meta'] @@ -209,17 +209,20 @@ class SQLiteWriter(SQLite,DiskWriter) : """ """ - if not self.fields : - if type(info) == pd.DataFrame : - _columns = list(info.columns) - self.init(list(info.keys())) + #if not self.fields : + # #if type(info) == pd.DataFrame : + # # _columns = list(info.columns) + # #self.init(list(info.keys())) if type(info) == dict : info = [info] elif type(info) == pd.DataFrame : info = info.fillna('') info = info.to_dict(orient='records') - + if not self.fields : + _rec = info[0] + self.init(list(_rec.keys())) + SQLiteWriter.LOCK.acquire() try: @@ -238,4 +241,4 @@ class SQLiteWriter(SQLite,DiskWriter) : except Exception as e : print (e) pass - SQLiteWriter.LOCK.release() \ No newline at end of file + SQLiteWriter.LOCK.release() diff --git a/transport/nextcloud.py b/transport/nextcloud.py new file mode 100644 index 0000000..f096f70 --- /dev/null +++ b/transport/nextcloud.py @@ -0,0 +1,80 @@ +""" +We are implementing transport to and from nextcloud (just like s3) +""" +import os +import sys +from transport.common import Reader,Writer +import pandas as pd +from io import StringIO +import json +import nextcloud_client as nextcloud + +class Nextcloud : + def __init__(self,**_args): + pass + self._delimiter = None + self._handler = nextcloud.Client(_args['url']) + _uid = _args['uid'] + _token = _args['token'] + self._uri = _args['folder'] if 'folder' in _args else './' + if self._uri.endswith('/') : + self._uri = self._uri[:-1] + self._file = None if 'file' not in _args else _args['file'] + self._handler.login(_uid,_token) + def close(self): + try: + self._handler.logout() + except Exception as e: + pass + + +class NextcloudReader(Nextcloud,Reader): + def __init__(self,**_args): + # self._file = [] if 'file' not in _args else _args['file'] + super().__init__(**_args) + pass + def read(self,**_args): + _filename = self._file if 'file' not in _args else _args['file'] + # + # @TODO: if _filename is none, an exception should be raised + # + _uri = '/'.join([self._uri,_filename]) + if self._handler.get_file(_uri) : + # + # + _info = self._handler.file_info(_uri) + _content = self._handler.get_file_contents(_uri).decode('utf8') + if _info.get_content_type() == 'text/csv' : + # + # @TODO: enable handling of csv, xls, parquet, pickles + _file = StringIO(_content) + return pd.read_csv(_file) + else: + # + # if it is neither a structured document like csv, we will return the content as is + return _content + return None +class NextcloudWriter (Nextcloud,Writer): + """ + This class will write data to an instance of nextcloud + """ + def __init__(self,**_args) : + super().__init__(**_args) + self + def write(self,_data,**_args): + """ + This function will upload a file to a given destination + :file has the uri of the location of the file + """ + _filename = self._file if 'file' not in _args else _args['file'] + _uri = '/'.join([self._uri,_filename]) + if type(_data) == pd.DataFrame : + f = StringIO() + _data.to_csv(f,index=False) + _content = f.getvalue() + elif type(_data) == dict : + _content = json.dumps(_data) + else: + _content = str(_data) + self._handler.put_file_contents(_uri,_content) + diff --git a/transport/providers.py b/transport/providers.py index fc394f3..23843e7 100644 --- a/transport/providers.py +++ b/transport/providers.py @@ -10,6 +10,7 @@ from transport import etl as etl from transport import qlistener from transport import bricks from transport import session +from transport import nextcloud import psycopg2 as pg import mysql.connector as my from google.cloud import bigquery as bq @@ -34,7 +35,7 @@ MARIADB = 'mariadb' COUCHDB = 'couch' CONSOLE = 'console' ETL = 'etl' - +NEXTCLOUD = 'nextcloud' # # synonyms of the above @@ -49,18 +50,19 @@ AWS_S3 = 's3' RABBIT = RABBITMQ QLISTENER = 'qlistener' +QUEUE = QLISTENER DATABRICKS= 'databricks+connector' DRIVERS = {PG:pg,REDSHIFT:pg,MYSQL:my,MARIADB:my,NETEZZA:nz,SQLITE:sqlite3} -CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[BIGQUERY,DATABRICKS],'file':[FILE], - 'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QLISTENER],'http':[HTTP]} +CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[NEXTCLOUD,S3,BIGQUERY,DATABRICKS],'file':[FILE], + 'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QUEUE],'http':[HTTP]} READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader}, - 'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader}, + 'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader,NEXTCLOUD:nextcloud.NextcloudReader}, 'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener}, # 'cli':{CONSOLE:Console},'memory':{CONSOLE:Console},'http':session.HttpReader } WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter}, - 'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter}, + 'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter,NEXTCLOUD:nextcloud.NextcloudWriter}, 'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener}, # 'cli':{CONSOLE:Console}, # 'memory':{CONSOLE:Console}, 'http':session.HttpReader @@ -70,6 +72,7 @@ WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.Co PROVIDERS = { FILE:{'read':disk.DiskReader,'write':disk.DiskWriter}, SQLITE:{'read':disk.SQLiteReader,'write':disk.SQLiteWriter,'driver':sqlite3}, + 'sqlite3':{'read':disk.SQLiteReader,'write':disk.SQLiteWriter,'driver':sqlite3}, POSTGRESQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':pg,'default':{'host':'localhost','port':5432}}, NETEZZA:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':nz,'default':{'port':5480}}, @@ -78,12 +81,16 @@ PROVIDERS = { MYSQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}}, MARIADB:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}}, + S3:{'read':s3.s3Reader,'write':s3.s3Writer}, BIGQUERY:{'read':sql.BigQueryReader,'write':sql.BigQueryWriter}, + DATABRICKS:{'read':bricks.BricksReader,'write':bricks.BricksWriter}, + NEXTCLOUD:{'read':nextcloud.NextcloudReader,'write':nextcloud.NextcloudWriter}, + QLISTENER:{'read':qlistener.qListener,'write':qlistener.qListener,'default':{'host':'localhost','port':5672}}, CONSOLE:{'read':qlistener.Console,"write":qlistener.Console}, HTTP:{'read':session.HttpReader,'write':session.HttpWriter}, - DATABRICKS:{'read':bricks.BricksReader,'write':bricks.BricksWriter}, + MONGODB:{'read':mongo.MongoReader,'write':mongo.MongoWriter,'default':{'port':27017,'host':'localhost'}}, COUCHDB:{'read':couch.CouchReader,'writer':couch.CouchWriter,'default':{'host':'localhost','port':5984}}, ETL :{'read':etl.Transporter,'write':etl.Transporter} @@ -92,4 +99,4 @@ DEFAULT = {PG:{'host':'localhost','port':5432},MYSQL:{'host':'localhost','port': DEFAULT[MONGODB] = {'port':27017,'host':'localhost'} DEFAULT[REDSHIFT] = DEFAULT[PG] DEFAULT[MARIADB] = DEFAULT[MYSQL] -DEFAULT[NETEZZA] = {'port':5480} \ No newline at end of file +DEFAULT[NETEZZA] = {'port':5480} diff --git a/transport/version.py b/transport/version.py index 5e7e7b7..3fa6e8d 100644 --- a/transport/version.py +++ b/transport/version.py @@ -1,2 +1,2 @@ __author__ = 'The Phi Technology' -__version__= '1.9.0' +__version__= '1.9.2'