From 0da32069e2ca5245b13d9a70159c95f0dfd53774 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Fri, 8 Sep 2023 11:28:35 -0500 Subject: [PATCH 01/16] databricks support --- transport/__init__.py | 15 +++++- transport/bricks.py | 111 +++++++++++++++++++++++++++++++++++++++++ transport/providers.py | 11 ++-- transport/sql.py | 4 +- transport/version.py | 2 +- 5 files changed, 135 insertions(+), 8 deletions(-) create mode 100644 transport/bricks.py diff --git a/transport/__init__.py b/transport/__init__.py index 8a45800..4c2270c 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -157,20 +157,33 @@ class factory : return anObject import time -def instance(**_args): +def instance(**_pargs): """ creating an instance given the provider, we should have an idea of :class, :driver :provider :read|write = {connection to the database} """ + # + # @TODO: provide authentication file that will hold all the parameters, that will later on be used + # + _args = dict(_pargs,**{}) + if 'auth_file' in _args : + path = _args['auth_file'] + file = open(path) + _config = json.loads( file.read()) + _args = dict(_args,**_config) + file.close() + _provider = _args['provider'] _group = None + for _id in providers.CATEGORIES : if _provider in providers.CATEGORIES[_id] : _group = _id break if _group : + _classPointer = _getClassInstance(_group,**_args) # # Let us reformat the arguments diff --git a/transport/bricks.py b/transport/bricks.py new file mode 100644 index 0000000..0aa4383 --- /dev/null +++ b/transport/bricks.py @@ -0,0 +1,111 @@ +""" +This file implements databricks handling, This functionality will rely on databricks-sql-connector +LICENSE (MIT) +Copyright 2016-2020, The Phi Technology LLC + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + +@TODO: + - Migrate SQLite to SQL hierarchy + - Include Write in Chunks from pandas +""" +import os +import sqlalchemy +from transport.common import Reader,Writer +import pandas as pd + + +class Bricks: + """ + :host + :token + :database + :cluster_path + :table + """ + def __init__(self,**_args): + _host = _args['host'] + _token= _args['token'] + _cluster_path = _args['cluster_path'] + self._schema = _args['schema'] if 'schema' in _args else _args['database'] + _catalog = _args['catalog'] + self._table = _args['table'] if 'table' in _args else None + + # + # @TODO: + # Sometimes when the cluster isn't up and running it takes a while, the user should be alerted of this + # + + _uri = f'''databricks://token:{_token}@{_host}?http_path={_cluster_path}&catalog={_catalog}&schema={self._schema}''' + self._engine = sqlalchemy.create_engine (_uri) + pass + def meta(self,**_args): + table = _args['table'] if 'table' in _args else self._table + if not table : + return [] + else: + if sqlalchemy.__version__.startswith('1.') : + _m = sqlalchemy.MetaData(bind=self._engine) + _m.reflect(only=[table]) + else: + _m = sqlalchemy.MetaData() + _m.reflect(bind=self._engine) + # + # Let's retrieve te information associated with a table + # + return [{'name':_attr.name,'type':_attr.type} for _attr in _m.tables[table].columns] + + def has(self,**_args): + return self.meta(**_args) + def apply(self,_sql): + try: + if _sql.lower().startswith('select') : + return pd.read_sql(_sql,self._engine) + except Exception as e: + pass + +class BricksReader(Bricks,Reader): + """ + This class is designed for reads and will execute reads against a table name or a select SQL statement + """ + def __init__(self,**_args): + super().__init__(**_args) + def read(self,**_args): + limit = None if 'limit' not in _args else str(_args['limit']) + + if 'sql' in _args : + sql = _args['sql'] + elif 'table' in _args : + table = _args['table'] + sql = f'SELECT * FROM {table}' + if limit : + sql = sql + f' LIMIT {limit}' + + if 'sql' in _args or 'table' in _args : + return self.apply(sql) + else: + return pd.DataFrame() + pass +class BricksWriter(Bricks,Writer): + def __init__(self,**_args): + super().__init__(**_args) + def write(self,_data,**_args): + """ + This data will write data to data-bricks against a given table. If the table is not specified upon initiazation, it can be specified here + _data: data frame to push to databricks + _args: chunks, table, schema + """ + _schema = self._schema if 'schema' not in _args else _args['schema'] + _table = self._table if 'table' not in _args else _args['table'] + _df = _data if type(_data) == pd.DataFrame else _data + if type(_df) == dict : + _df = [_df] + if type(_df) == list : + _df = pd.DataFrame(_df) + _df.to_sql( + name=_table,schema=_schema, + con=self._engine,if_exists='append',index=False); + pass diff --git a/transport/providers.py b/transport/providers.py index 4fe4784..a638a89 100644 --- a/transport/providers.py +++ b/transport/providers.py @@ -8,6 +8,7 @@ from transport import mongo as mongo from transport import sql as sql from transport import etl as etl from transport import qlistener +from transport import bricks import psycopg2 as pg import mysql.connector as my from google.cloud import bigquery as bq @@ -45,16 +46,18 @@ AWS_S3 = 's3' RABBIT = RABBITMQ QLISTENER = 'qlistener' - +DATABRICKS= 'databricks+connector' DRIVERS = {PG:pg,REDSHIFT:pg,MYSQL:my,MARIADB:my,NETEZZA:nz,SQLITE:sqlite3} -CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[BIGQUERY],'file':[FILE], +CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[BIGQUERY,DATABRICKS],'file':[FILE], 'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QLISTENER],'http':[HTTP]} -READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader},'cloud':sql.BigQueryReader, +READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader}, + 'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader}, 'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener}, 'cli':{CONSOLE:Console},'memory':{CONSOLE:Console} } -WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter},'cloud':sql.BigQueryWriter, +WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter}, + 'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter}, 'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener},'cli':{CONSOLE:Console},'memory':{CONSOLE:Console} } diff --git a/transport/sql.py b/transport/sql.py index da412fa..3c555f5 100644 --- a/transport/sql.py +++ b/transport/sql.py @@ -431,8 +431,8 @@ class BQReader(BigQuery,Reader) : super().__init__(**_args) def apply(self,sql): - self.read(sql=sql) - pass + return self.read(sql=sql) + def read(self,**_args): SQL = None table = self.table if 'table' not in _args else _args['table'] diff --git a/transport/version.py b/transport/version.py index 9db71e5..6d0f952 100644 --- a/transport/version.py +++ b/transport/version.py @@ -1,2 +1,2 @@ __author__ = 'The Phi Technology' -__version__= '1.8.2' +__version__= '1.8.4' From 324d81bd167d89c299834f454ff60412dda56af6 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Mon, 18 Sep 2023 20:00:40 -0500 Subject: [PATCH 02/16] bug fix with mysql --- transport/__init__.py | 1 - transport/mongo.py | 10 ++++++++-- transport/providers.py | 2 +- transport/version.py | 2 +- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/transport/__init__.py b/transport/__init__.py index 4c2270c..bbb2e50 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -242,7 +242,6 @@ def _get_alchemyEngine(**_args): uri = ''.join([_provider,"://",_account,_fhost,'/',_database]) - _engine = sqlalchemy.create_engine (uri,future=True) _out = {'sqlalchemy':_engine} _pargs = {'host':_host,'port':_port,'username':_username,'password':_password} diff --git a/transport/mongo.py b/transport/mongo.py index 96c9075..c24b4b8 100644 --- a/transport/mongo.py +++ b/transport/mongo.py @@ -95,10 +95,16 @@ class MongoReader(Mongo,Reader): Mongo.__init__(self,**args) def read(self,**args): - if 'mongo' in args or 'cmd' in args: + if 'mongo' in args or 'cmd' in args or 'pipeline' in args: # # @TODO: - cmd = args['mongo'] if 'mongo' in args else args['cmd'] + cmd = {} + if 'pipeline' in args : + cmd['pipeline']= args['pipeline'] + if 'aggregate' not in cmd : + cmd['aggregate'] = self.collection + if 'pipeline' not in args or 'aggregate' not in cmd : + cmd = args['mongo'] if 'mongo' in args else args['cmd'] if "aggregate" in cmd : if "allowDiskUse" not in cmd : cmd["allowDiskUse"] = True diff --git a/transport/providers.py b/transport/providers.py index a638a89..c1c4bae 100644 --- a/transport/providers.py +++ b/transport/providers.py @@ -27,7 +27,7 @@ SQLITE = 'sqlite' SQLITE3= 'sqlite' REDSHIFT = 'redshift' NETEZZA = 'netezza' -MYSQL = 'mysql' +MYSQL = 'mysql+mysqlconnector' RABBITMQ = 'rabbitmq' MARIADB = 'mariadb' COUCHDB = 'couch' diff --git a/transport/version.py b/transport/version.py index 6d0f952..ec087c4 100644 --- a/transport/version.py +++ b/transport/version.py @@ -1,2 +1,2 @@ __author__ = 'The Phi Technology' -__version__= '1.8.4' +__version__= '1.8.6' From 3f7f3d7306f6339f7bb945936af522ea69017296 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Fri, 29 Sep 2023 20:27:53 -0500 Subject: [PATCH 03/16] refactor: factory, etl, fixes: session --- bin/transport | 178 +++++++++++------ setup.py | 2 +- transport/__init__.py | 113 ++++++----- transport/common.py | 46 ++--- transport/etl.py | 435 +++++++++++++++++++++++++---------------- transport/providers.py | 33 +++- transport/qlistener.py | 5 + transport/session.py | 100 ++++++---- transport/sql.py | 22 +-- transport/version.py | 2 +- 10 files changed, 593 insertions(+), 343 deletions(-) diff --git a/bin/transport b/bin/transport index 8edaecc..2225f3b 100755 --- a/bin/transport +++ b/bin/transport @@ -14,19 +14,27 @@ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLI Usage : - transport --config --procs -@TODO: Create tables if they don't exist for relational databases -example of configuration : - -1. Move data from a folder to a data-store - transport [--folder ] --config #-- assuming the configuration doesn't have folder - transport --folder --provider -- --table|doc -In this case the configuration should look like : - {folder:..., target:{}} -2. Move data from one source to another - transport --config - {source:{..},target:{..}} or [{source:{..},target:{..}},{source:{..},target:{..}}] - + transport help -- will print this page + + transport move [index] + path to the configuration file + optional index within the configuration file + +e.g: configuration file (JSON formatted) + - single source to a single target + + {"source":{"provider":"http","url":"https://cdn.wsform.com/wp-content/uploads/2020/06/agreement.csv"} + "target":{"provider":"sqlite3","path":"transport-demo.sqlite","table":"agreement"} + } + + - single source to multiple targets + { + "source":{"provider":"http","url":"https://cdn.wsform.com/wp-content/uploads/2020/06/agreement.csv"}, + "target":[ + {"provider":"sqlite3","path":"transport-demo.sqlite","table":"agreement}, + {"provider":"mongodb","db":"transport-demo","collection":"agreement"} + ] + } """ import pandas as pd @@ -36,51 +44,111 @@ import sys import transport import time from multiprocessing import Process -SYS_ARGS = {} -if len(sys.argv) > 1: +import typer +import os +from transport import etl +from transport import providers + +# SYS_ARGS = {} +# if len(sys.argv) > 1: - N = len(sys.argv) - for i in range(1,N): - value = None - if sys.argv[i].startswith('--'): - key = sys.argv[i][2:] #.replace('-','') - SYS_ARGS[key] = 1 - if i + 1 < N: - value = sys.argv[i + 1] = sys.argv[i+1].strip() - if key and value and not value.startswith('--'): - SYS_ARGS[key] = value +# N = len(sys.argv) +# for i in range(1,N): +# value = None +# if sys.argv[i].startswith('--'): +# key = sys.argv[i][2:] #.replace('-','') +# SYS_ARGS[key] = 1 +# if i + 1 < N: +# value = sys.argv[i + 1] = sys.argv[i+1].strip() +# if key and value and not value.startswith('--'): +# SYS_ARGS[key] = value - i += 2 - -if __name__ == '__main__' : - # - # Load information from the file ... - if 'help' in SYS_ARGS : - print (__doc__) - else: - try: - _info = json.loads(open(SYS_ARGS['config']).read()) - if 'index' in SYS_ARGS : - _index = int(SYS_ARGS['index']) - _info = [_item for _item in _info if _info.index(_item) == _index] - pass - elif 'id' in SYS_ARGS : - _info = [_item for _item in _info if 'id' in _item and _item['id'] == SYS_ARGS['id']] +# i += 2 + +app = typer.Typer() + +# @app.command() +def help() : + print (__doc__) +def wait(jobs): + while jobs : + jobs = [thread for thread in jobs if thread.is_alive()] + time.sleep(1) + +@app.command() +def move (path,index=None): + + _proxy = lambda _object: _object.write(_object.read()) + if os.path.exists(path): + file = open(path) + _config = json.loads (file.read() ) + file.close() + if index : + _config = _config[ int(index)] + etl.instance(**_config) + else: + etl.instance(_config) + + # + # if type(_config) == dict : + # _object = transport.etl.instance(**_config) + # _proxy(_object) + # else: + # # + # # here we are dealing with a list of objects (long ass etl job) + # jobs = [] + # failed = [] + # for _args in _config : + # if index and _config.index(_args) != index : + # continue + + # _object=transport.etl.instance(**_args) + # thread = Process(target=_proxy,args=(_object,)) + # thread.start() + # jobs.append(thread()) + # if _config.index(_args) == 0 : + # thread.join() + wait(jobs) + +@app.command() +def generate (path:str): + __doc__=""" + + """ + _config = [{"source":{"provider":"http","url":"https://cdn.wsform.com/wp-content/uploads/2020/06/agreement.csv"},"target":{"provider":"file","path":"addresses.csv","delimiter":"csv"}}] + file = open(path,'w') + file.write(json.dumps(_config)) + file.close() + +# if __name__ == '__main__' : +# # +# # Load information from the file ... +# if 'help' in SYS_ARGS : +# print (__doc__) +# else: +# try: +# _info = json.loads(open(SYS_ARGS['config']).read()) +# if 'index' in SYS_ARGS : +# _index = int(SYS_ARGS['index']) +# _info = [_item for _item in _info if _info.index(_item) == _index] +# pass +# elif 'id' in SYS_ARGS : +# _info = [_item for _item in _info if 'id' in _item and _item['id'] == SYS_ARGS['id']] - procs = 1 if 'procs' not in SYS_ARGS else int(SYS_ARGS['procs']) - jobs = transport.factory.instance(provider='etl',info=_info,procs=procs) - print ([len(jobs),' Jobs are running']) - N = len(jobs) - while jobs : - x = len(jobs) - jobs = [_job for _job in jobs if _job.is_alive()] - if x != len(jobs) : - print ([len(jobs),'... jobs still running']) - time.sleep(1) - print ([N,' Finished running']) - except Exception as e: +# procs = 1 if 'procs' not in SYS_ARGS else int(SYS_ARGS['procs']) +# jobs = transport.factory.instance(provider='etl',info=_info,procs=procs) +# print ([len(jobs),' Jobs are running']) +# N = len(jobs) +# while jobs : +# x = len(jobs) +# jobs = [_job for _job in jobs if _job.is_alive()] +# if x != len(jobs) : +# print ([len(jobs),'... jobs still running']) +# time.sleep(1) +# print ([N,' Finished running']) +# except Exception as e: - print (e) +# print (e) - \ No newline at end of file + diff --git a/setup.py b/setup.py index 7eff1e4..254bb5c 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ args = { "license":"MIT", "packages":["transport"]} args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite'] -args["install_requires"] = ['pymongo','sqlalchemy<2.0.0','pandas','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] +args["install_requires"] = ['pymongo','sqlalchemy<2.0.0','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args['scripts'] = ['bin/transport'] if sys.version_info[0] == 2 : diff --git a/transport/__init__.py b/transport/__init__.py index bbb2e50..e139aa5 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -28,7 +28,7 @@ import importlib import sys import sqlalchemy if sys.version_info[0] > 2 : - from transport.common import Reader, Writer,Console #, factory + # from transport.common import Reader, Writer,Console #, factory from transport import disk from transport import s3 as s3 @@ -97,7 +97,7 @@ class factory : TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}} PROVIDERS = { "etl":{"class":{"read":etl.instance,"write":etl.instance}}, - "console":{"class":{"write":Console,"read":Console}}, + # "console":{"class":{"write":Console,"read":Console}}, "file":{"class":{"read":disk.DiskReader,"write":disk.DiskWriter}}, "sqlite":{"class":{"read":disk.SQLiteReader,"write":disk.SQLiteWriter}}, "postgresql":{"port":5432,"host":"localhost","database":None,"driver":pg,"default":{"type":"VARCHAR"},"class":{"read":sql.SQLReader,"write":sql.SQLWriter}}, @@ -124,6 +124,9 @@ class factory : # # Legacy code being returned return factory._instance(**_args); + + + else: return instance(**_args) @staticmethod @@ -175,22 +178,31 @@ def instance(**_pargs): file.close() _provider = _args['provider'] - _group = None + _context = list( set(['read','write','listen']) & set(_args.keys()) ) + if _context : + _context = _context[0] + else: + _context = _args['context'] if 'context' in _args else 'read' + # _group = None - for _id in providers.CATEGORIES : - if _provider in providers.CATEGORIES[_id] : - _group = _id - break - if _group : + # for _id in providers.CATEGORIES : + # if _provider in providers.CATEGORIES[_id] : + # _group = _id + # break + # if _group : + + if _provider in providers.PROVIDERS and _context in providers.PROVIDERS[_provider]: - _classPointer = _getClassInstance(_group,**_args) + # _classPointer = _getClassInstance(_group,**_args) + _classPointer = providers.PROVIDERS[_provider][_context] # # Let us reformat the arguments - if 'read' in _args or 'write' in _args : - _args = _args['read'] if 'read' in _args else _args['write'] - _args['provider'] = _provider - if _group == 'sql' : + # if 'read' in _args or 'write' in _args : + # _args = _args['read'] if 'read' in _args else _args['write'] + # _args['provider'] = _provider + # if _group == 'sql' : + if _provider in providers.CATEGORIES['sql'] : _info = _get_alchemyEngine(**_args) _args = dict(_args,**_info) @@ -215,57 +227,68 @@ def _get_alchemyEngine(**_args): This function returns the SQLAlchemy engine associated with parameters, This is only applicable for SQL _items :_args arguments passed to the factory {provider and other} """ - #@TODO: Enable authentication files (private_key) - _username = _args['username'] if 'username' in _args else '' - _password = _args['password'] if 'password' in _args else '' - _account = _args['account'] if 'account' in _args else '' - _database = _args['database'] _provider = _args['provider'] - if _username != '': - _account = _username + ':'+_password+'@' - _host = _args['host'] if 'host' in _args else '' - _port = _args['port'] if 'port' in _args else '' - if _provider in providers.DEFAULT : - _default = providers.DEFAULT[_provider] - _host = _host if _host != '' else (_default['host'] if 'host' in _default else '') - _port = _port if _port != '' else (_default['port'] if 'port' in _default else '') - if _port == '': - _port = providers.DEFAULT['port'] if 'port' in providers.DEFAULT else '' - # - - if _host != '' and _port != '' : - _fhost = _host+":"+str(_port) #--formatted hostname + _pargs = {} + if _provider == providers.SQLITE3 : + _path = _args['database'] if 'database' in _args else _args['path'] + uri = ''.join([_provider,':///',_path]) + else: - _fhost = _host - # Let us update the parameters we have thus far + + #@TODO: Enable authentication files (private_key) + _username = _args['username'] if 'username' in _args else '' + _password = _args['password'] if 'password' in _args else '' + _account = _args['account'] if 'account' in _args else '' + _database = _args['database'] if 'database' in _args else _args['path'] + + if _username != '': + _account = _username + ':'+_password+'@' + _host = _args['host'] if 'host' in _args else '' + _port = _args['port'] if 'port' in _args else '' + if _provider in providers.DEFAULT : + _default = providers.DEFAULT[_provider] + _host = _host if _host != '' else (_default['host'] if 'host' in _default else '') + _port = _port if _port != '' else (_default['port'] if 'port' in _default else '') + if _port == '': + _port = providers.DEFAULT['port'] if 'port' in providers.DEFAULT else '' + # + + if _host != '' and _port != '' : + _fhost = _host+":"+str(_port) #--formatted hostname + else: + _fhost = _host + # Let us update the parameters we have thus far # - uri = ''.join([_provider,"://",_account,_fhost,'/',_database]) + uri = ''.join([_provider,"://",_account,_fhost,'/',_database]) + _pargs = {'host':_host,'port':_port,'username':_username,'password':_password} _engine = sqlalchemy.create_engine (uri,future=True) _out = {'sqlalchemy':_engine} - _pargs = {'host':_host,'port':_port,'username':_username,'password':_password} + for key in _pargs : if _pargs[key] != '' : _out[key] = _pargs[key] return _out +@DeprecationWarning def _getClassInstance(_group,**_args): """ This function returns the class instance we are attempting to instanciate :_group items in providers.CATEGORIES.keys() :_args arguments passed to the factory class """ - if 'read' in _args or 'write' in _args : - _context = 'read' if 'read' in _args else _args['write'] - _info = _args[_context] - else: - _context = _args['context'] if 'context' in _args else 'read' - _class = providers.READ[_group] if _context == 'read' else providers.WRITE[_group] - if type(_class) == dict and _args['provider'] in _class: - _class = _class[_args['provider']] + # if 'read' in _args or 'write' in _args : + # _context = 'read' if 'read' in _args else _args['write'] + # _info = _args[_context] + # else: + # _context = _args['context'] if 'context' in _args else 'read' + # _class = providers.READ[_group] if _context == 'read' else providers.WRITE[_group] + # if type(_class) == dict and _args['provider'] in _class: + # _class = _class[_args['provider']] - return _class + # return _class +@DeprecationWarning def __instance(**_args): """ diff --git a/transport/common.py b/transport/common.py index 39df6a3..59f57ea 100644 --- a/transport/common.py +++ b/transport/common.py @@ -93,29 +93,29 @@ class ReadWriter(Reader,Writer) : This class implements the read/write functions aggregated """ pass -class Console(Writer): - lock = RLock() - def __init__(self,**_args): - self.lock = _args['lock'] if 'lock' in _args else False - self.info = self.write - self.debug = self.write - self.log = self.write - pass - def write (self,logs=None,**_args): - if self.lock : - Console.lock.acquire() - try: - _params = _args if logs is None and _args else logs - if type(_params) == list: - for row in _params : - print (row) - else: - print (_params) - except Exception as e : - print (e) - finally: - if self.lock : - Console.lock.release() +# class Console(Writer): +# lock = RLock() +# def __init__(self,**_args): +# self.lock = _args['lock'] if 'lock' in _args else False +# self.info = self.write +# self.debug = self.write +# self.log = self.write +# pass +# def write (self,logs=None,**_args): +# if self.lock : +# Console.lock.acquire() +# try: +# _params = _args if logs is None and _args else logs +# if type(_params) == list: +# for row in _params : +# print (row) +# else: +# print (_params) +# except Exception as e : +# print (e) +# finally: +# if self.lock : +# Console.lock.release() """ diff --git a/transport/etl.py b/transport/etl.py index 9d520d4..dac58c4 100644 --- a/transport/etl.py +++ b/transport/etl.py @@ -35,6 +35,9 @@ import json import sys import transport import time +import os + + from multiprocessing import Process SYS_ARGS = {} if len(sys.argv) > 1: @@ -52,199 +55,301 @@ if len(sys.argv) > 1: i += 2 - -class Post(Process): - def __init__(self,**args): +class Transporter(Process): + """ + The transporter (Jason Stathem) moves data from one persistant store to another + - callback functions + :onFinish callback function when finished + :onError callback function when an error occurs + :source source data specification + :target destination(s) to move the data to + """ + def __init__(self,**_args): super().__init__() - self.store = args['target'] - if 'provider' not in args['target'] : - pass - self.PROVIDER = args['target']['type'] - # self.writer = transport.factory.instance(**args['target']) - else: - self.PROVIDER = args['target']['provider'] - self.store['context'] = 'write' - # self.store = args['target'] - self.store['lock'] = True - # self.writer = transport.instance(**args['target']) + # self.onfinish = _args['onFinish'] + # self._onerror = _args['onError'] + self._source = _args['source'] + self._target = _args['target'] + # - # If the table doesn't exists maybe create it ? + # Let's insure we can support multiple targets + self._target = [self._target] if type(self._target) != list else self._target + + pass + def read(self,**_args): + """ + This function + """ + _reader = transport.factory.instance(**self._source) # - self.rows = args['rows'] - # self.rows = args['rows'].fillna('') + # If arguments are provided then a query is to be executed (not just a table dump) + return _reader.read() if 'args' not in self._source else _reader.read(**self._source['args']) + + def _delegate_write(self,_data,**_args): + """ + This function will write a data-frame to a designated data-store, The function is built around a delegation design pattern + :data data-frame or object to be written + """ + for _target in self._target : + if 'write' not in _target : + _target['context'] = 'write' + _target['lock'] = True + else: + _target['write']['lock'] = True + _writer = transport.factory.instance(**_target) + _writer.write(_data,**_args) + if hasattr(_writer,'close') : + _writer.close() + + def write(self,_df,**_args): + """ + """ + SEGMENT_COUNT = 6 + MAX_ROWS = 1000000 + # _df = self.read() + _segments = np.array_split(np.range(_df.shape[0]),SEGMENT_COUNT) if _df.shape[0] > MAX_ROWS else np.array( [np.arange(_df.shape[0])]) + # _index = 0 + + + for _indexes in _segments : + _fwd_args = {} if not _args else _args + + self._delegate_write(_df.iloc[_indexes],**_fwd_args) + # + # @TODO: Perhaps consider writing up each segment in a thread/process (speeds things up?) + pass + +def instance(**_args): + _proxy = lambda _agent: _agent.write(_agent.read()) + if 'source' in _args and 'target' in _args : + + _agent = Transporter(**_args) + _proxy(_agent) + + else: + _config = _args['config'] + _items = [Transporter(**_item) for _item in _config ] + _MAX_JOBS = 5 + _items = np.array_split(_items,_MAX_JOBS) + for _batch in _items : + jobs = [] + for _item in _batch : + thread = Process(target=_proxy,args = (_item,)) + thread.start() + jobs.append(thread) + while jobs : + jobs = [thread for thread in jobs if thread.is_alive()] + time.sleep(1) + + pass +# class Post(Process): +# def __init__(self,**args): +# super().__init__() +# self.store = args['target'] +# if 'provider' not in args['target'] : +# pass +# self.PROVIDER = args['target']['type'] +# # self.writer = transport.factory.instance(**args['target']) +# else: +# self.PROVIDER = args['target']['provider'] +# self.store['context'] = 'write' +# # self.store = args['target'] +# self.store['lock'] = True +# # self.writer = transport.instance(**args['target']) +# # +# # If the table doesn't exists maybe create it ? +# # +# self.rows = args['rows'] +# # self.rows = args['rows'].fillna('') - def log(self,**_args) : - if ETL.logger : - ETL.logger.info(**_args) +# def log(self,**_args) : +# if ETL.logger : +# ETL.logger.info(**_args) - def run(self): - _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows +# def run(self): +# _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows - writer = transport.factory.instance(**self.store) - writer.write(_info) - writer.close() +# writer = transport.factory.instance(**self.store) +# writer.write(_info) +# writer.close() -class ETL (Process): - logger = None - def __init__(self,**_args): - super().__init__() +# class ETL (Process): +# logger = None +# def __init__(self,**_args): +# super().__init__() - self.name = _args['id'] if 'id' in _args else 'UNREGISTERED' - # if 'provider' not in _args['source'] : - # #@deprecate - # self.reader = transport.factory.instance(**_args['source']) - # else: - # # - # # This is the new interface - # _args['source']['context'] = 'read' +# self.name = _args['id'] if 'id' in _args else 'UNREGISTERED' +# # if 'provider' not in _args['source'] : +# # #@deprecate +# # self.reader = transport.factory.instance(**_args['source']) +# # else: +# # # +# # # This is the new interface +# # _args['source']['context'] = 'read' - # self.reader = transport.instance(**_args['source']) +# # self.reader = transport.instance(**_args['source']) - # - # do we have an sql query provided or not .... - # self.sql = _args['source']['sql'] if 'sql' in _args['source'] else None - # self.cmd = _args['source']['cmd'] if 'cmd' in _args['source'] else None - # self._oargs = _args['target'] #transport.factory.instance(**_args['target']) - self._source = _args ['source'] - self._target = _args['target'] - self._source['context'] = 'read' - self._target['context'] = 'write' +# # +# # do we have an sql query provided or not .... +# # self.sql = _args['source']['sql'] if 'sql' in _args['source'] else None +# # self.cmd = _args['source']['cmd'] if 'cmd' in _args['source'] else None +# # self._oargs = _args['target'] #transport.factory.instance(**_args['target']) +# self._source = _args ['source'] +# self._target = _args['target'] +# self._source['context'] = 'read' +# self._target['context'] = 'write' - self.JOB_COUNT = _args['jobs'] - self.jobs = [] - # self.logger = transport.factory.instance(**_args['logger']) - def log(self,**_args) : - if ETL.logger : - ETL.logger.info(**_args) +# self.JOB_COUNT = _args['jobs'] +# self.jobs = [] +# # self.logger = transport.factory.instance(**_args['logger']) +# def log(self,**_args) : +# if ETL.logger : +# ETL.logger.info(**_args) - def run(self): - # if self.cmd : - # idf = self.reader.read(**self.cmd) - # else: - # idf = self.reader.read() - # idf = pd.DataFrame(idf) - # # idf = idf.replace({np.nan: None}, inplace = True) +# def run(self): +# # if self.cmd : +# # idf = self.reader.read(**self.cmd) +# # else: +# # idf = self.reader.read() +# # idf = pd.DataFrame(idf) +# # # idf = idf.replace({np.nan: None}, inplace = True) - # idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()] - # self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT) +# # idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()] +# # self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT) - # - # writing the data to a designated data source - # - try: +# # +# # writing the data to a designated data source +# # +# try: - _log = {"name":self.name,"rows":{"input":0,"output":0}} - _reader = transport.factory.instance(**self._source) - if 'table' in self._source : - _df = _reader.read() - else: - _df = _reader.read(**self._source['cmd']) - _log['rows']['input'] = _df.shape[0] - # - # Let's write the input data-frame to the target ... - _writer = transport.factory.instance(**self._target) - _writer.write(_df) - _log['rows']['output'] = _df.shape[0] +# _log = {"name":self.name,"rows":{"input":0,"output":0}} +# _reader = transport.factory.instance(**self._source) +# if 'table' in self._source : +# _df = _reader.read() +# else: +# _df = _reader.read(**self._source['cmd']) +# _log['rows']['input'] = _df.shape[0] +# # +# # Let's write the input data-frame to the target ... +# _writer = transport.factory.instance(**self._target) +# _writer.write(_df) +# _log['rows']['output'] = _df.shape[0] - # self.log(module='write',action='partitioning',jobs=self.JOB_COUNT) - # rows = np.array_split(np.arange(0,idf.shape[0]),self.JOB_COUNT) +# # self.log(module='write',action='partitioning',jobs=self.JOB_COUNT) +# # rows = np.array_split(np.arange(0,idf.shape[0]),self.JOB_COUNT) - # # - # # @TODO: locks - # for i in np.arange(self.JOB_COUNT) : - # # _id = ' '.join([str(i),' table ',self.name]) - # indexes = rows[i] - # segment = idf.loc[indexes,:].copy() #.to_dict(orient='records') - # _name = "partition-"+str(i) - # if segment.shape[0] == 0 : - # continue +# # # +# # # @TODO: locks +# # for i in np.arange(self.JOB_COUNT) : +# # # _id = ' '.join([str(i),' table ',self.name]) +# # indexes = rows[i] +# # segment = idf.loc[indexes,:].copy() #.to_dict(orient='records') +# # _name = "partition-"+str(i) +# # if segment.shape[0] == 0 : +# # continue - # proc = Post(target = self._oargs,rows = segment,name=_name) - # self.jobs.append(proc) - # proc.start() +# # proc = Post(target = self._oargs,rows = segment,name=_name) +# # self.jobs.append(proc) +# # proc.start() - # self.log(module='write',action='working',segment=str(self.name),table=self.name,rows=segment.shape[0]) - # while self.jobs : - # jobs = [job for job in proc if job.is_alive()] - # time.sleep(1) - except Exception as e: - print (e) - self.log(**_log) - def is_done(self): - self.jobs = [proc for proc in self.jobs if proc.is_alive()] - return len(self.jobs) == 0 -def instance(**_args): - """ - :path ,index, id - :param _info list of objects with {source,target}` - :param logger - """ - logger = _args['logger'] if 'logger' in _args else None - if 'path' in _args : - _info = json.loads((open(_args['path'])).read()) +# # self.log(module='write',action='working',segment=str(self.name),table=self.name,rows=segment.shape[0]) +# # while self.jobs : +# # jobs = [job for job in proc if job.is_alive()] +# # time.sleep(1) +# except Exception as e: +# print (e) +# self.log(**_log) +# def is_done(self): +# self.jobs = [proc for proc in self.jobs if proc.is_alive()] +# return len(self.jobs) == 0 + + +# def instance (**_args): +# """ +# path to configuration file +# """ +# _path = _args['path'] +# _config = {} +# jobs = [] +# if os.path.exists(_path) : +# file = open(_path) +# _config = json.loads(file.read()) +# file.close() +# if _config and type + + +# def _instance(**_args): +# """ +# :path ,index, id +# :param _info list of objects with {source,target}` +# :param logger +# """ +# logger = _args['logger'] if 'logger' in _args else None +# if 'path' in _args : +# _info = json.loads((open(_args['path'])).read()) - if 'index' in _args : - _index = int(_args['index']) - _info = _info[_index] +# if 'index' in _args : +# _index = int(_args['index']) +# _info = _info[_index] - elif 'id' in _args : - _info = [_item for _item in _info if '_id' in _item and _item['id'] == _args['id']] - _info = _info[0] if _info else _info - else: - _info = _args['info'] +# elif 'id' in _args : +# _info = [_item for _item in _info if '_id' in _item and _item['id'] == _args['id']] +# _info = _info[0] if _info else _info +# else: +# _info = _args['info'] - if logger and type(logger) != str: - ETL.logger = logger - elif logger == 'console': - ETL.logger = transport.factory.instance(provider='console',context='write',lock=True) - if type(_info) in [list,dict] : - _info = _info if type(_info) != dict else [_info] - # - # The assumption here is that the objects within the list are {source,target} - jobs = [] - for _item in _info : +# if logger and type(logger) != str: +# ETL.logger = logger +# elif logger == 'console': +# ETL.logger = transport.factory.instance(provider='console',context='write',lock=True) +# if type(_info) in [list,dict] : +# _info = _info if type(_info) != dict else [_info] +# # +# # The assumption here is that the objects within the list are {source,target} +# jobs = [] +# for _item in _info : - _item['jobs'] = 5 if 'procs' not in _args else int(_args['procs']) - _job = ETL(**_item) +# _item['jobs'] = 5 if 'procs' not in _args else int(_args['procs']) +# _job = ETL(**_item) - _job.start() - jobs.append(_job) - return jobs +# _job.start() +# jobs.append(_job) +# return jobs - else: - return None - -if __name__ == '__main__' : - _info = json.loads(open (SYS_ARGS['config']).read()) - index = int(SYS_ARGS['index']) if 'index' in SYS_ARGS else None - procs = [] - for _config in _info : - if 'source' in SYS_ARGS : - _config['source'] = {"type":"disk.DiskReader","args":{"path":SYS_ARGS['source'],"delimiter":","}} - - _config['jobs'] = 3 if 'jobs' not in SYS_ARGS else int(SYS_ARGS['jobs']) - etl = ETL (**_config) - if index is None: +# else: +# return None + +# if __name__ == '__main__' : +# _info = json.loads(open (SYS_ARGS['config']).read()) +# index = int(SYS_ARGS['index']) if 'index' in SYS_ARGS else None +# procs = [] +# for _config in _info : +# if 'source' in SYS_ARGS : +# _config['source'] = {"type":"disk.DiskReader","args":{"path":SYS_ARGS['source'],"delimiter":","}} + +# _config['jobs'] = 3 if 'jobs' not in SYS_ARGS else int(SYS_ARGS['jobs']) +# etl = ETL (**_config) +# if index is None: - etl.start() - procs.append(etl) +# etl.start() +# procs.append(etl) - elif _info.index(_config) == index : +# elif _info.index(_config) == index : - # print (_config) - procs = [etl] - etl.start() - break - # - # - N = len(procs) - while procs : - procs = [thread for thread in procs if not thread.is_done()] - if len(procs) < N : - print (["Finished ",(N-len(procs)), " remaining ", len(procs)]) - N = len(procs) - time.sleep(1) - # print ("We're done !!") \ No newline at end of file +# # print (_config) +# procs = [etl] +# etl.start() +# break +# # +# # +# N = len(procs) +# while procs : +# procs = [thread for thread in procs if not thread.is_done()] +# if len(procs) < N : +# print (["Finished ",(N-len(procs)), " remaining ", len(procs)]) +# N = len(procs) +# time.sleep(1) +# # print ("We're done !!") \ No newline at end of file diff --git a/transport/providers.py b/transport/providers.py index c1c4bae..fc394f3 100644 --- a/transport/providers.py +++ b/transport/providers.py @@ -1,4 +1,4 @@ -from transport.common import Reader, Writer,Console #, factory +# from transport.common import Reader, Writer,Console #, factory from transport import disk import sqlite3 from transport import s3 as s3 @@ -9,6 +9,7 @@ from transport import sql as sql from transport import etl as etl from transport import qlistener from transport import bricks +from transport import session import psycopg2 as pg import mysql.connector as my from google.cloud import bigquery as bq @@ -33,6 +34,8 @@ MARIADB = 'mariadb' COUCHDB = 'couch' CONSOLE = 'console' ETL = 'etl' + + # # synonyms of the above BQ = BIGQUERY @@ -54,13 +57,37 @@ CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,C READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader}, 'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader}, 'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener}, - 'cli':{CONSOLE:Console},'memory':{CONSOLE:Console} + # 'cli':{CONSOLE:Console},'memory':{CONSOLE:Console},'http':session.HttpReader } WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter}, 'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter}, - 'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener},'cli':{CONSOLE:Console},'memory':{CONSOLE:Console} + 'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener}, + # 'cli':{CONSOLE:Console}, + # 'memory':{CONSOLE:Console}, 'http':session.HttpReader } +# SQL_PROVIDERS = [POSTGRESQL,MYSQL,NETEZZA,MARIADB,SQLITE] +PROVIDERS = { + FILE:{'read':disk.DiskReader,'write':disk.DiskWriter}, + SQLITE:{'read':disk.SQLiteReader,'write':disk.SQLiteWriter,'driver':sqlite3}, + + POSTGRESQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':pg,'default':{'host':'localhost','port':5432}}, + NETEZZA:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':nz,'default':{'port':5480}}, + REDSHIFT:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':pg,'default':{'host':'localhost','port':5432}}, + RABBITMQ:{'read':queue.QueueReader,'writer':queue.QueueWriter,'context':queue.QueueListener,'default':{'host':'localhost','port':5432}}, + + MYSQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}}, + MARIADB:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}}, + S3:{'read':s3.s3Reader,'write':s3.s3Writer}, + BIGQUERY:{'read':sql.BigQueryReader,'write':sql.BigQueryWriter}, + QLISTENER:{'read':qlistener.qListener,'write':qlistener.qListener,'default':{'host':'localhost','port':5672}}, + CONSOLE:{'read':qlistener.Console,"write":qlistener.Console}, + HTTP:{'read':session.HttpReader,'write':session.HttpWriter}, + DATABRICKS:{'read':bricks.BricksReader,'write':bricks.BricksWriter}, + MONGODB:{'read':mongo.MongoReader,'write':mongo.MongoWriter,'default':{'port':27017,'host':'localhost'}}, + COUCHDB:{'read':couch.CouchReader,'writer':couch.CouchWriter,'default':{'host':'localhost','port':5984}}, + ETL :{'read':etl.Transporter,'write':etl.Transporter} +} DEFAULT = {PG:{'host':'localhost','port':5432},MYSQL:{'host':'localhost','port':3306}} DEFAULT[MONGODB] = {'port':27017,'host':'localhost'} DEFAULT[REDSHIFT] = DEFAULT[PG] diff --git a/transport/qlistener.py b/transport/qlistener.py index 495b731..26f0ba8 100644 --- a/transport/qlistener.py +++ b/transport/qlistener.py @@ -40,3 +40,8 @@ class qListener : _q = qListener._queue[_id] _q.put(_data) _q.join() +class Console (qListener): + def __init__(self,**_args): + super().__init__(callback=print) + + # self.callback = print \ No newline at end of file diff --git a/transport/session.py b/transport/session.py index 915d2b5..d74669a 100644 --- a/transport/session.py +++ b/transport/session.py @@ -1,54 +1,60 @@ from flask import request, session from datetime import datetime import re -from common import Reader, Writer +from transport.common import Reader, Writer import json +import requests +from io import StringIO +import pandas as pd -class HttpRequestReader(Reader): + +class HttpReader(Reader): """ This class is designed to read data from an Http request file handler provided to us by flask The file will be heald in memory and processed accordingly NOTE: This is inefficient and can crash a micro-instance (becareful) """ - def __init__(self,**params): - self.file_length = 0 - try: - - #self.file = params['file'] - #self.file.seek(0, os.SEEK_END) - #self.file_length = self.file.tell() - - #print 'size of file ',self.file_length - self.content = params['file'].readlines() - self.file_length = len(self.content) - except Exception as e: - print ("Error ... ",e) - pass + def __init__(self,**_args): + self._url = _args['url'] + self._headers = None if 'headers' not in _args else _args['headers'] + + # def isready(self): + # return self.file_length > 0 + def format(self,_response): + _mimetype= _response.headers['Content-Type'] + if _mimetype == 'text/csv' or 'text/csv': + _content = _response.text + return pd.read_csv(StringIO(_content)) + # + # @TODO: Add support for excel, JSON and other file formats that fit into a data-frame + # - def isready(self): - return self.file_length > 0 - def read(self,size =-1): - i = 1 - for row in self.content: - i += 1 - if size == i: - break - yield row + return _response.text + def read(self,**_args): + if self._headers : + r = requests.get(self._url,headers = self._headers) + else: + r = requests.get(self._url,headers = self._headers) + return self.format(r) -class HttpSessionWriter(Writer): +class HttpWriter(Writer): """ - This class is designed to write data to a session/cookie + This class is designed to submit data to an endpoint (url) """ - def __init__(self,**params): + def __init__(self,**_args): """ @param key required session key """ - self.session = params['queue'] - self.session['sql'] = [] - self.session['csv'] = [] - self.tablename = re.sub('..+$','',params['filename']) - self.session['uid'] = params['uid'] + self._url = _args['url'] + self._name = _args['name'] + self._method = 'post' if 'method' not in _args else _args['method'] + + # self.session = params['queue'] + # self.session['sql'] = [] + # self.session['csv'] = [] + # self.tablename = re.sub('..+$','',params['filename']) + # self.session['uid'] = params['uid'] #self.xchar = params['xchar'] @@ -57,10 +63,26 @@ class HttpSessionWriter(Writer): return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename) def isready(self): return True - def write(self,**params): - label = params['label'] - row = params ['row'] + def write(self,_data,**_args): + # + # + _method = self._method if 'method' not in _args else _args['method'] + _method = _method.lower() + _mimetype = 'text/csv' + if type(_data) == dict : + _mimetype = 'application/json' + _content = _data + else: + _content = _data.to_dict(orient='records') + _headers = {'Content-Type':_mimetype} + _pointer = getattr(requests,_method) + + _pointer ({self._name:_content},headers=_headers) + + + # label = params['label'] + # row = params ['row'] - if label == 'usable': - self.session['csv'].append(self.format(row,',')) - self.session['sql'].append(self.format_sql(row)) + # if label == 'usable': + # self.session['csv'].append(self.format(row,',')) + # self.session['sql'].append(self.format_sql(row)) diff --git a/transport/sql.py b/transport/sql.py index 3c555f5..019db78 100644 --- a/transport/sql.py +++ b/transport/sql.py @@ -291,17 +291,17 @@ class SQLWriter(SQLRW,Writer): """ # inspect = False if 'inspect' not in _args else _args['inspect'] # cast = False if 'cast' not in _args else _args['cast'] - if not self.fields : - if type(info) == list : - _fields = info[0].keys() - elif type(info) == dict : - _fields = info.keys() - elif type(info) == pd.DataFrame : - _fields = info.columns.tolist() - - # _fields = info.keys() if type(info) == dict else info[0].keys() - _fields = list (_fields) - self.init(_fields) + # if not self.fields : + # if type(info) == list : + # _fields = info[0].keys() + # elif type(info) == dict : + # _fields = info.keys() + # elif type(info) == pd.DataFrame : + # _fields = info.columns.tolist() + + # # _fields = info.keys() if type(info) == dict else info[0].keys() + # # _fields = list (_fields) + # self.init(_fields) try: table = _args['table'] if 'table' in _args else self.table diff --git a/transport/version.py b/transport/version.py index ec087c4..5e7e7b7 100644 --- a/transport/version.py +++ b/transport/version.py @@ -1,2 +1,2 @@ __author__ = 'The Phi Technology' -__version__= '1.8.6' +__version__= '1.9.0' From 2bb07aedec694dc0074bc6f66b0133d29b8d4d0f Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Sat, 30 Sep 2023 00:18:37 -0500 Subject: [PATCH 04/16] bug fix: sqlite and cursors and transport --- bin/transport | 28 ++++++++++----- transport/disk.py | 87 ++++++++++++++++++++++++----------------------- transport/etl.py | 28 ++++++++------- 3 files changed, 80 insertions(+), 63 deletions(-) diff --git a/bin/transport b/bin/transport index 2225f3b..dd424a2 100755 --- a/bin/transport +++ b/bin/transport @@ -46,6 +46,7 @@ import time from multiprocessing import Process import typer import os +import transport from transport import etl from transport import providers @@ -88,7 +89,7 @@ def move (path,index=None): _config = _config[ int(index)] etl.instance(**_config) else: - etl.instance(_config) + etl.instance(config=_config) # # if type(_config) == dict : @@ -109,19 +110,30 @@ def move (path,index=None): # jobs.append(thread()) # if _config.index(_args) == 0 : # thread.join() - wait(jobs) - + # wait(jobs) +@app.command() +def version(): + print (transport.version.__version__) @app.command() def generate (path:str): - __doc__=""" - """ - _config = [{"source":{"provider":"http","url":"https://cdn.wsform.com/wp-content/uploads/2020/06/agreement.csv"},"target":{"provider":"file","path":"addresses.csv","delimiter":"csv"}}] + This function will generate a configuration template to give a sense of how to create one + """ + _config = [ + { + "source":{"provider":"http","url":"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv"}, + "target": + [{"provider":"file","path":"addresses.csv","delimiter":"csv"},{"provider":"sqlite","database":"sample.db3","table":"addresses"}] + } + ] file = open(path,'w') file.write(json.dumps(_config)) file.close() - -# if __name__ == '__main__' : +@app.command() +def usage(): + print (__doc__) +if __name__ == '__main__' : + app() # # # # Load information from the file ... # if 'help' in SYS_ARGS : diff --git a/transport/disk.py b/transport/disk.py index 8514e3f..a3880ec 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -62,34 +62,25 @@ class DiskWriter(Writer): """ THREAD_LOCK = Lock() def __init__(self,**params): - Writer.__init__(self) - self.cache['meta'] = {'cols':0,'rows':0,'delimiter':None} - if 'path' in params: - self.path = params['path'] - else: - self.path = 'data-transport.log' - self.delimiter = params['delimiter'] if 'delimiter' in params else None - # if 'name' in params: - # self.name = params['name']; - # else: - # self.name = 'data-transport.log' - # if os.path.exists(self.path) == False: - # os.mkdir(self.path) - def meta(self): - return self.cache['meta'] - def isready(self): - """ - This function determines if the class is ready for execution or not - i.e it determines if the preconditions of met prior execution - """ - return True - # p = self.path is not None and os.path.exists(self.path) - # q = self.name is not None - # return p and q - def format (self,row): - self.cache['meta']['cols'] += len(row) if isinstance(row,list) else len(row.keys()) - self.cache['meta']['rows'] += 1 - return (self.delimiter.join(row) if self.delimiter else json.dumps(row))+"\n" + super().__init__() + self._path = params['path'] + self._delimiter = params['delimiter'] + + # def meta(self): + # return self.cache['meta'] + # def isready(self): + # """ + # This function determines if the class is ready for execution or not + # i.e it determines if the preconditions of met prior execution + # """ + # return True + # # p = self.path is not None and os.path.exists(self.path) + # # q = self.name is not None + # # return p and q + # def format (self,row): + # self.cache['meta']['cols'] += len(row) if isinstance(row,list) else len(row.keys()) + # self.cache['meta']['rows'] += 1 + # return (self.delimiter.join(row) if self.delimiter else json.dumps(row))+"\n" def write(self,info,**_args): """ This function writes a record to a designated file @@ -97,21 +88,30 @@ class DiskWriter(Writer): @param row row to be written """ try: + _mode = 'a' if 'overwrite' not in _args else 'w' DiskWriter.THREAD_LOCK.acquire() - f = open(self.path,_mode) - if self.delimiter : - if type(info) == list : - for row in info : - f.write(self.format(row)) - else: - f.write(self.format(info)) - else: - if not type(info) == str : - f.write(json.dumps(info)+"\n") - else: - f.write(info) - f.close() + # # _path = _args['path'] if 'path' in _args else self.path + # # _delim= _args['delimiter'] if 'delimiter' in _args else self._delimiter + # # info.to_csv(_path,sep=_delim) + # info.to_csv(self.path) + # f = open(self.path,_mode) + # if self.delimiter : + # if type(info) == list : + # for row in info : + # f.write(self.format(row)) + # else: + # f.write(self.format(info)) + # else: + # if not type(info) == str : + # f.write(json.dumps(info)+"\n") + # else: + # f.write(info) + # f.close() + _delim = self._delimiter if 'delimiter' not in _args else _args['delimiter'] + _path = self.path if 'path' not in _args else _args['path'] + info.to_csv(_path,index=False,sep=_delim) + pass except Exception as e: # # Not sure what should be done here ... @@ -220,16 +220,19 @@ class SQLiteWriter(SQLite,DiskWriter) : # # If the table doesn't exist we should create it # - def write(self,info): + def write(self,info,**_args): """ """ if not self.fields : + if type(info) == pd.DataFrame : + _columns = list(info.columns) self.init(list(info.keys())) if type(info) == dict : info = [info] elif type(info) == pd.DataFrame : + info = info.fillna('') info = info.to_dict(orient='records') SQLiteWriter.LOCK.acquire() diff --git a/transport/etl.py b/transport/etl.py index dac58c4..aa4a73e 100644 --- a/transport/etl.py +++ b/transport/etl.py @@ -70,7 +70,7 @@ class Transporter(Process): # self._onerror = _args['onError'] self._source = _args['source'] self._target = _args['target'] - + # # Let's insure we can support multiple targets self._target = [self._target] if type(self._target) != list else self._target @@ -90,16 +90,18 @@ class Transporter(Process): This function will write a data-frame to a designated data-store, The function is built around a delegation design pattern :data data-frame or object to be written """ - for _target in self._target : - if 'write' not in _target : - _target['context'] = 'write' - _target['lock'] = True - else: - _target['write']['lock'] = True - _writer = transport.factory.instance(**_target) - _writer.write(_data,**_args) - if hasattr(_writer,'close') : - _writer.close() + if _data.shape[0] > 0 : + for _target in self._target : + if 'write' not in _target : + _target['context'] = 'write' + # _target['lock'] = True + else: + # _target['write']['lock'] = True + pass + _writer = transport.factory.instance(**_target) + _writer.write(_data.copy(),**_args) + if hasattr(_writer,'close') : + _writer.close() def write(self,_df,**_args): """ @@ -109,12 +111,12 @@ class Transporter(Process): # _df = self.read() _segments = np.array_split(np.range(_df.shape[0]),SEGMENT_COUNT) if _df.shape[0] > MAX_ROWS else np.array( [np.arange(_df.shape[0])]) # _index = 0 - + for _indexes in _segments : _fwd_args = {} if not _args else _args - self._delegate_write(_df.iloc[_indexes],**_fwd_args) + self._delegate_write(_df.iloc[_indexes]) # # @TODO: Perhaps consider writing up each segment in a thread/process (speeds things up?) pass From 4320159f3d930f74ff84c537d7c62992f6d2ebb1 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Sat, 30 Sep 2023 01:17:35 -0500 Subject: [PATCH 05/16] bug fixes --- transport/disk.py | 29 +++++++---------------------- transport/etl.py | 2 +- 2 files changed, 8 insertions(+), 23 deletions(-) diff --git a/transport/disk.py b/transport/disk.py index a3880ec..1d966c7 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -65,7 +65,7 @@ class DiskWriter(Writer): super().__init__() self._path = params['path'] self._delimiter = params['delimiter'] - + self._mode = 'w' if 'mode' not in params else params['mode'] # def meta(self): # return self.cache['meta'] # def isready(self): @@ -89,28 +89,13 @@ class DiskWriter(Writer): """ try: - _mode = 'a' if 'overwrite' not in _args else 'w' - DiskWriter.THREAD_LOCK.acquire() - # # _path = _args['path'] if 'path' in _args else self.path - # # _delim= _args['delimiter'] if 'delimiter' in _args else self._delimiter - # # info.to_csv(_path,sep=_delim) - # info.to_csv(self.path) - # f = open(self.path,_mode) - # if self.delimiter : - # if type(info) == list : - # for row in info : - # f.write(self.format(row)) - # else: - # f.write(self.format(info)) - # else: - # if not type(info) == str : - # f.write(json.dumps(info)+"\n") - # else: - # f.write(info) - # f.close() + + DiskWriter.THREAD_LOCK.acquire() + _delim = self._delimiter if 'delimiter' not in _args else _args['delimiter'] - _path = self.path if 'path' not in _args else _args['path'] - info.to_csv(_path,index=False,sep=_delim) + _path = self._path if 'path' not in _args else _args['path'] + _mode = self._mode if 'mode' not in _args else _args['mode'] + info.to_csv(_path,index=False,sep=_delim, mode=_mode) pass except Exception as e: # diff --git a/transport/etl.py b/transport/etl.py index aa4a73e..b2e0e6a 100644 --- a/transport/etl.py +++ b/transport/etl.py @@ -116,7 +116,7 @@ class Transporter(Process): for _indexes in _segments : _fwd_args = {} if not _args else _args - self._delegate_write(_df.iloc[_indexes]) + self._delegate_write(_df.iloc[_indexes],**_fwd_args) # # @TODO: Perhaps consider writing up each segment in a thread/process (speeds things up?) pass From a7fe357b2c4687cc736bcaaf56a7de71de6fa162 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Sat, 30 Sep 2023 09:24:58 -0500 Subject: [PATCH 06/16] bug fixes, write file and other misc with ETL --- transport/disk.py | 2 +- transport/etl.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/transport/disk.py b/transport/disk.py index 1d966c7..d8ee757 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -95,7 +95,7 @@ class DiskWriter(Writer): _delim = self._delimiter if 'delimiter' not in _args else _args['delimiter'] _path = self._path if 'path' not in _args else _args['path'] _mode = self._mode if 'mode' not in _args else _args['mode'] - info.to_csv(_path,index=False,sep=_delim, mode=_mode) + info.to_csv(_path,index=False,sep=_delim) pass except Exception as e: # diff --git a/transport/etl.py b/transport/etl.py index b2e0e6a..198bdf0 100644 --- a/transport/etl.py +++ b/transport/etl.py @@ -99,7 +99,7 @@ class Transporter(Process): # _target['write']['lock'] = True pass _writer = transport.factory.instance(**_target) - _writer.write(_data.copy(),**_args) + _writer.write(_data,**_args) if hasattr(_writer,'close') : _writer.close() From 5660d8ba593f34a677759b575b58436dfef8a53f Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Sat, 11 Nov 2023 10:30:58 -0600 Subject: [PATCH 07/16] nextcloud handling --- transport/disk.py | 2 +- transport/nextcloud.py | 76 ++++++++++++++++++++++++++++++++++++++++++ transport/providers.py | 18 ++++++---- transport/version.py | 2 +- 4 files changed, 90 insertions(+), 8 deletions(-) create mode 100644 transport/nextcloud.py diff --git a/transport/disk.py b/transport/disk.py index d8ee757..f092a3d 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -64,7 +64,7 @@ class DiskWriter(Writer): def __init__(self,**params): super().__init__() self._path = params['path'] - self._delimiter = params['delimiter'] + self._delimiter = params['delimiter'] if 'delimiter' in params else None self._mode = 'w' if 'mode' not in params else params['mode'] # def meta(self): # return self.cache['meta'] diff --git a/transport/nextcloud.py b/transport/nextcloud.py new file mode 100644 index 0000000..457eb83 --- /dev/null +++ b/transport/nextcloud.py @@ -0,0 +1,76 @@ +""" +We are implementing transport to and from nextcloud (just like s3) +""" +import os +import sys +from transport.common import Reader,Writer +import pandas as pd +from io import StringIO +import json +import nextcloud_client as nextcloud + +class Nextcloud : + def __init__(self,**_args): + pass + self._delimiter = None + self._handler = nextcloud.Client(_args['url']) + _uid = _args['uid'] + _token = _args['token'] + self._uri = _args['folder'] if 'folder' in _args else './' + if self._uri.endswith('/') : + self._uri = self._uri[:-1] + self._file = None if 'file' not in _args else _args['file'] + self._handler.login(_uid,_token) + def close(self): + try: + self._handler.logout() + except Exception as e: + pass + + +class NextcloudReader(Nextcloud,Reader): + def __init__(self,**_args): + # self._file = [] if 'file' not in _args else _args['file'] + super().__init__(**_args) + pass + def read(self,**_args): + _filename = self._file if 'file' not in _args else _args['file'] + # + # @TODO: if _filename is none, an exception should be raised + # + _uri = '/'.join([self._uri,_filename]) + if self._handler.get_file(_uri) : + # + # + _info = self._handler.file_info(_uri) + _content = self._handler.get_file_contents(_uri).decode('utf8') + if _info.get_content_type() == 'text/csv' : + _file = StringIO(_content) + return pd.read_csv(_file) + else: + return _content + return None +class NextcloudWriter (Nextcloud,Writer): + """ + This class will write data to an instance of nextcloud + """ + def __init__(self,**_args) : + super().__init__(**_args) + self + def write(self,_data,**_args): + """ + This function will upload a file to a given destination + :file has the uri of the location of the file + """ + _filename = self._file if 'file' not in _args else _args['file'] + _uri = '/'.join([self._uri,_filename]) + if type(_data) == pd.DataFrame : + f = StringIO() + _data.to_csv(f,index=False) + _content = f.getvalue() + elif type(_data) == dict : + _content = json.dumps(_data) + else: + _content = str(_data) + self._handler.put_file_contents(_uri,_content) + diff --git a/transport/providers.py b/transport/providers.py index fc394f3..a798960 100644 --- a/transport/providers.py +++ b/transport/providers.py @@ -10,6 +10,7 @@ from transport import etl as etl from transport import qlistener from transport import bricks from transport import session +from transport import nextcloud import psycopg2 as pg import mysql.connector as my from google.cloud import bigquery as bq @@ -34,7 +35,7 @@ MARIADB = 'mariadb' COUCHDB = 'couch' CONSOLE = 'console' ETL = 'etl' - +NEXTCLOUD = 'nextcloud' # # synonyms of the above @@ -49,18 +50,19 @@ AWS_S3 = 's3' RABBIT = RABBITMQ QLISTENER = 'qlistener' +QUEUE = QLISTENER DATABRICKS= 'databricks+connector' DRIVERS = {PG:pg,REDSHIFT:pg,MYSQL:my,MARIADB:my,NETEZZA:nz,SQLITE:sqlite3} -CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[BIGQUERY,DATABRICKS],'file':[FILE], - 'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QLISTENER],'http':[HTTP]} +CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[NEXTCLOUD,S3,BIGQUERY,DATABRICKS],'file':[FILE], + 'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QUEUE],'http':[HTTP]} READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader}, - 'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader}, + 'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader,NEXTCLOUD:nextcloud.NextcloudReader}, 'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener}, # 'cli':{CONSOLE:Console},'memory':{CONSOLE:Console},'http':session.HttpReader } WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter}, - 'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter}, + 'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter,NEXTCLOUD:nextcloud.NextcloudWriter}, 'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener}, # 'cli':{CONSOLE:Console}, # 'memory':{CONSOLE:Console}, 'http':session.HttpReader @@ -78,12 +80,16 @@ PROVIDERS = { MYSQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}}, MARIADB:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}}, + S3:{'read':s3.s3Reader,'write':s3.s3Writer}, BIGQUERY:{'read':sql.BigQueryReader,'write':sql.BigQueryWriter}, + DATABRICKS:{'read':bricks.BricksReader,'write':bricks.BricksWriter}, + NEXTCLOUD:{'read':nextcloud.NextcloudReader,'write':nextcloud.NextcloudWriter}, + QLISTENER:{'read':qlistener.qListener,'write':qlistener.qListener,'default':{'host':'localhost','port':5672}}, CONSOLE:{'read':qlistener.Console,"write":qlistener.Console}, HTTP:{'read':session.HttpReader,'write':session.HttpWriter}, - DATABRICKS:{'read':bricks.BricksReader,'write':bricks.BricksWriter}, + MONGODB:{'read':mongo.MongoReader,'write':mongo.MongoWriter,'default':{'port':27017,'host':'localhost'}}, COUCHDB:{'read':couch.CouchReader,'writer':couch.CouchWriter,'default':{'host':'localhost','port':5984}}, ETL :{'read':etl.Transporter,'write':etl.Transporter} diff --git a/transport/version.py b/transport/version.py index 5e7e7b7..3fa6e8d 100644 --- a/transport/version.py +++ b/transport/version.py @@ -1,2 +1,2 @@ __author__ = 'The Phi Technology' -__version__= '1.9.0' +__version__= '1.9.2' From 0930eb0f5c2378a4b9f09616f46e8072e364c02c Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Sat, 11 Nov 2023 10:37:32 -0600 Subject: [PATCH 08/16] nextcloud dependency pyncclient --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 254bb5c..c322c38 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ args = { "license":"MIT", "packages":["transport"]} args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite'] -args["install_requires"] = ['pymongo','sqlalchemy<2.0.0','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] +args["install_requires"] = ['pyncclient','pymongo','sqlalchemy<2.0.0','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args['scripts'] = ['bin/transport'] if sys.version_info[0] == 2 : From 7d29a69a232dec91357c69c020dfa70ec79be037 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 21 Nov 2023 11:01:05 -0600 Subject: [PATCH 09/16] reminder --- transport/nextcloud.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/transport/nextcloud.py b/transport/nextcloud.py index 457eb83..f096f70 100644 --- a/transport/nextcloud.py +++ b/transport/nextcloud.py @@ -45,9 +45,13 @@ class NextcloudReader(Nextcloud,Reader): _info = self._handler.file_info(_uri) _content = self._handler.get_file_contents(_uri).decode('utf8') if _info.get_content_type() == 'text/csv' : + # + # @TODO: enable handling of csv, xls, parquet, pickles _file = StringIO(_content) return pd.read_csv(_file) else: + # + # if it is neither a structured document like csv, we will return the content as is return _content return None class NextcloudWriter (Nextcloud,Writer): From 9da2894b07fb47b583382173433e8ac579de9855 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Thu, 30 Nov 2023 12:05:04 -0600 Subject: [PATCH 10/16] bug fixes with sqlite and provider --- transport/disk.py | 15 +++++++++------ transport/providers.py | 3 ++- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/transport/disk.py b/transport/disk.py index f092a3d..956386d 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -209,17 +209,20 @@ class SQLiteWriter(SQLite,DiskWriter) : """ """ - if not self.fields : - if type(info) == pd.DataFrame : - _columns = list(info.columns) - self.init(list(info.keys())) + #if not self.fields : + # #if type(info) == pd.DataFrame : + # # _columns = list(info.columns) + # #self.init(list(info.keys())) if type(info) == dict : info = [info] elif type(info) == pd.DataFrame : info = info.fillna('') info = info.to_dict(orient='records') - + if not self.fields : + _rec = info[0] + self.init(list(_rec.keys())) + SQLiteWriter.LOCK.acquire() try: @@ -238,4 +241,4 @@ class SQLiteWriter(SQLite,DiskWriter) : except Exception as e : print (e) pass - SQLiteWriter.LOCK.release() \ No newline at end of file + SQLiteWriter.LOCK.release() diff --git a/transport/providers.py b/transport/providers.py index a798960..23843e7 100644 --- a/transport/providers.py +++ b/transport/providers.py @@ -72,6 +72,7 @@ WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.Co PROVIDERS = { FILE:{'read':disk.DiskReader,'write':disk.DiskWriter}, SQLITE:{'read':disk.SQLiteReader,'write':disk.SQLiteWriter,'driver':sqlite3}, + 'sqlite3':{'read':disk.SQLiteReader,'write':disk.SQLiteWriter,'driver':sqlite3}, POSTGRESQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':pg,'default':{'host':'localhost','port':5432}}, NETEZZA:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':nz,'default':{'port':5480}}, @@ -98,4 +99,4 @@ DEFAULT = {PG:{'host':'localhost','port':5432},MYSQL:{'host':'localhost','port': DEFAULT[MONGODB] = {'port':27017,'host':'localhost'} DEFAULT[REDSHIFT] = DEFAULT[PG] DEFAULT[MARIADB] = DEFAULT[MYSQL] -DEFAULT[NETEZZA] = {'port':5480} \ No newline at end of file +DEFAULT[NETEZZA] = {'port':5480} From 214f276ae430f02679a458c4fb563e6d0b0a55f4 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Thu, 30 Nov 2023 12:05:44 -0600 Subject: [PATCH 11/16] bug fixes with sqlite and provider --- transport/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/version.py b/transport/version.py index 3fa6e8d..2b34f5b 100644 --- a/transport/version.py +++ b/transport/version.py @@ -1,2 +1,2 @@ __author__ = 'The Phi Technology' -__version__= '1.9.2' +__version__= '1.9.4' From 5a5922b736f15f6a1228d037ca2a199873365768 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Thu, 30 Nov 2023 12:12:14 -0600 Subject: [PATCH 12/16] version update --- transport/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/version.py b/transport/version.py index 2b34f5b..3fa6e8d 100644 --- a/transport/version.py +++ b/transport/version.py @@ -1,2 +1,2 @@ __author__ = 'The Phi Technology' -__version__= '1.9.4' +__version__= '1.9.2' From 56bdda17b70b23c0b0af4b413432bc65b43a3654 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Thu, 30 Nov 2023 12:40:57 -0600 Subject: [PATCH 13/16] minor bug fix --- transport/disk.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/transport/disk.py b/transport/disk.py index 956386d..42b5b33 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -219,9 +219,10 @@ class SQLiteWriter(SQLite,DiskWriter) : elif type(info) == pd.DataFrame : info = info.fillna('') info = info.to_dict(orient='records') - if not self.fields : - _rec = info[0] - self.init(list(_rec.keys())) + + if not self.fields : + _rec = info[0] + self.init(list(_rec.keys())) SQLiteWriter.LOCK.acquire() try: From d74372f645630fb100a4cb7d2afa2c421b426df4 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Fri, 8 Dec 2023 18:19:46 -0600 Subject: [PATCH 14/16] bug fixes: mongodb, common, nextcloud --- transport/__init__.py | 24 ++++++++++++++---------- transport/common.py | 15 ++++++++++++++- transport/disk.py | 7 ++++++- transport/mongo.py | 6 +++--- transport/nextcloud.py | 4 ++-- 5 files changed, 39 insertions(+), 17 deletions(-) diff --git a/transport/__init__.py b/transport/__init__.py index e139aa5..234c418 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -27,6 +27,7 @@ import json import importlib import sys import sqlalchemy +from datetime import datetime if sys.version_info[0] > 2 : # from transport.common import Reader, Writer,Console #, factory from transport import disk @@ -83,16 +84,19 @@ import os # PGSQL = POSTGRESQL # import providers -class IEncoder (json.JSONEncoder): - def default (self,object): - if type(object) == np.integer : - return int(object) - elif type(object) == np.floating: - return float(object) - elif type(object) == np.ndarray : - return object.tolist() - else: - return super(IEncoder,self).default(object) +# class IEncoder (json.JSONEncoder): +def IEncoder (self,object): + if type(object) == np.integer : + return int(object) + elif type(object) == np.floating: + return float(object) + elif type(object) == np.ndarray : + return object.tolist() + elif type(object) == datetime : + return o.isoformat() + else: + return super(IEncoder,self).default(object) + class factory : TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}} PROVIDERS = { diff --git a/transport/common.py b/transport/common.py index 59f57ea..8b9f718 100644 --- a/transport/common.py +++ b/transport/common.py @@ -25,7 +25,7 @@ from multiprocessing import RLock import queue # import couch # import mongo - +from datetime import datetime class IO: def init(self,**args): @@ -39,6 +39,19 @@ class IO: continue value = args[field] setattr(self,field,value) +class IEncoder (json.JSONEncoder): + def default (self,object): + if type(object) == np.integer : + return int(object) + elif type(object) == np.floating: + return float(object) + elif type(object) == np.ndarray : + return object.tolist() + elif type(object) == datetime : + return object.isoformat() + else: + return super(IEncoder,self).default(object) + class Reader (IO): """ This class is an abstraction of a read functionalities of a data store diff --git a/transport/disk.py b/transport/disk.py index 42b5b33..2c9f6c8 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -12,6 +12,8 @@ import json import sqlite3 import pandas as pd from multiprocessing import Lock +from transport.common import Reader, Writer, IEncoder + class DiskReader(Reader) : """ This class is designed to read data from disk (location on hard drive) @@ -221,6 +223,8 @@ class SQLiteWriter(SQLite,DiskWriter) : info = info.to_dict(orient='records') if not self.fields : + + _rec = info[0] self.init(list(_rec.keys())) @@ -231,7 +235,8 @@ class SQLiteWriter(SQLite,DiskWriter) : sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(:values)"]) for row in info : stream =["".join(["",value,""]) if type(value) == str else value for value in row.values()] - stream = json.dumps(stream).replace("[","").replace("]","") + stream = json.dumps(stream,cls=IEncoder) + stream = stream.replace("[","").replace("]","") self.conn.execute(sql.replace(":values",stream) ) diff --git a/transport/mongo.py b/transport/mongo.py index c24b4b8..bac1780 100644 --- a/transport/mongo.py +++ b/transport/mongo.py @@ -15,7 +15,7 @@ import gridfs # from transport import Reader,Writer import sys if sys.version_info[0] > 2 : - from transport.common import Reader, Writer + from transport.common import Reader, Writer, IEncoder else: from common import Reader, Writer import json @@ -102,7 +102,7 @@ class MongoReader(Mongo,Reader): if 'pipeline' in args : cmd['pipeline']= args['pipeline'] if 'aggregate' not in cmd : - cmd['aggregate'] = self.collection + cmd['aggregate'] = self.uid if 'pipeline' not in args or 'aggregate' not in cmd : cmd = args['mongo'] if 'mongo' in args else args['cmd'] if "aggregate" in cmd : @@ -182,7 +182,7 @@ class MongoWriter(Mongo,Writer): for row in rows : if type(row['_id']) == ObjectId : row['_id'] = str(row['_id']) - stream = Binary(json.dumps(collection).encode()) + stream = Binary(json.dumps(collection,cls=IEncoder).encode()) collection.delete_many({}) now = "-".join([str(datetime.now().year()),str(datetime.now().month), str(datetime.now().day)]) name = ".".join([self.uid,'archive',now])+".json" diff --git a/transport/nextcloud.py b/transport/nextcloud.py index f096f70..2eefd51 100644 --- a/transport/nextcloud.py +++ b/transport/nextcloud.py @@ -3,7 +3,7 @@ We are implementing transport to and from nextcloud (just like s3) """ import os import sys -from transport.common import Reader,Writer +from transport.common import Reader,Writer, IEncoder import pandas as pd from io import StringIO import json @@ -73,7 +73,7 @@ class NextcloudWriter (Nextcloud,Writer): _data.to_csv(f,index=False) _content = f.getvalue() elif type(_data) == dict : - _content = json.dumps(_data) + _content = json.dumps(_data,cls=IEncoder) else: _content = str(_data) self._handler.put_file_contents(_uri,_content) From e46ebadcc2c0a383d838c83e6ddba7a82dae8162 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Mon, 11 Dec 2023 22:10:53 -0600 Subject: [PATCH 15/16] bug fix --- transport/disk.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/transport/disk.py b/transport/disk.py index 2c9f6c8..424e95e 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -223,8 +223,6 @@ class SQLiteWriter(SQLite,DiskWriter) : info = info.to_dict(orient='records') if not self.fields : - - _rec = info[0] self.init(list(_rec.keys())) From fbfaaebbdc4300c137a0209d1a83f9ebecc6ac21 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Wed, 20 Dec 2023 10:14:15 -0600 Subject: [PATCH 16/16] adding alias CALLBACK == QLISTENER --- transport/providers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/transport/providers.py b/transport/providers.py index 23843e7..93b6f53 100644 --- a/transport/providers.py +++ b/transport/providers.py @@ -51,6 +51,7 @@ RABBIT = RABBITMQ QLISTENER = 'qlistener' QUEUE = QLISTENER +CALLBACK = QLISTENER DATABRICKS= 'databricks+connector' DRIVERS = {PG:pg,REDSHIFT:pg,MYSQL:my,MARIADB:my,NETEZZA:nz,SQLITE:sqlite3} CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[NEXTCLOUD,S3,BIGQUERY,DATABRICKS],'file':[FILE],