From 66374058988745dba912f3a74a141927d4aa0b65 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 28 Jan 2025 13:53:24 -0600 Subject: [PATCH] bug fix: issue with sqlalchemy & python 3.12 --- bin/transport | 49 ++++++++++++++------------ info/__init__.py | 2 +- transport/sql/common.py | 78 ++++++++++++++++++++++------------------- 3 files changed, 68 insertions(+), 61 deletions(-) diff --git a/bin/transport b/bin/transport index 97332ec..eb8b17a 100755 --- a/bin/transport +++ b/bin/transport @@ -33,14 +33,17 @@ from typing import Optional import time from termcolor import colored from enum import Enum -from typing import Tuple +from rich import print app = typer.Typer() -app_x = typer.Typer() +app_e = typer.Typer() #-- handles etl (run, generate) +app_x = typer.Typer() #-- handles plugins (list,add, test) +app_i = typer.Typer() #-- handles information (version, license) +app_r = typer.Typer() #-- handles registry REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport']) REGISTRY_FILE= 'transport-registry.json' -CHECK_MARK = ' '.join(['[',colored(u'\u2713', 'green'),']']) -TIMES_MARK= ' '.join(['[',colored(u'\u2717','red'),']']) +CHECK_MARK = '[ [green]\u2713[/green] ]' #' '.join(['[',colored(u'\u2713', 'green'),']']) +TIMES_MARK= '[ [red]\u2717[/red] ]' #' '.join(['[',colored(u'\u2717','red'),']']) # @app.command() def help() : print (__doc__) @@ -52,7 +55,7 @@ def wait (jobs): while jobs : jobs = [pthread for pthread in jobs if pthread.is_alive()] -@app.command(name="etl") +@app_e.command(name="run") def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")], index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"), batch:int = typer.Option(default=5, help="The number of parallel processes to run at once") @@ -89,10 +92,10 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil # time.sleep(1) # # @TODO: Log the job termination here ... -@app.command(name="providers") +@app_i.command(name="supported") def supported (format:Annotated[str,typer.Argument(help="format of the output, supported formats are (list,table,json)")]="table") : """ - This function will print supported providers/vendors and their associated classifications + This function will print supported database technologies """ _df = (transport.supported()) if format in ['list','json'] : @@ -101,17 +104,17 @@ def supported (format:Annotated[str,typer.Argument(help="format of the output, s print (_df) print () -@app.command() -def version(): +@app_i.command(name="license") +def info(): """ This function will display version and license information """ - print (transport.__app_name__,'version ',transport.__version__) + print (f'[bold] {transport.__app_name__} ,version {transport.__version__}[/bold]') print () print (transport.__license__) -@app.command() +@app_e.command() def generate (path:Annotated[str,typer.Argument(help="path of the ETL configuration file template (name included)")]): """ This function will generate a configuration template to give a sense of how to create one @@ -126,12 +129,12 @@ def generate (path:Annotated[str,typer.Argument(help="path of the ETL configurat file = open(path,'w') file.write(json.dumps(_config)) file.close() - print (f"""{CHECK_MARK} Successfully generated a template ETL file at {path}""" ) + print (f"""{CHECK_MARK} Successfully generated a template ETL file at [bold]{path}[/bold]""" ) print ("""NOTE: Each line (source or target) is the content of an auth-file""") -@app.command(name="init") +@app_r.command(name="reset") def initregistry (email:Annotated[str,typer.Argument(help="email")], path:str=typer.Option(default=REGISTRY_PATH,help="path or location of the configuration file"), override:bool=typer.Option(default=False,help="override existing configuration or not")): @@ -141,26 +144,24 @@ def initregistry (email:Annotated[str,typer.Argument(help="email")], """ try: transport.registry.init(email=email, path=path, override=override) - _msg = f"""{CHECK_MARK} Successfully wrote configuration to {path} from {email}""" + _msg = f"""{CHECK_MARK} Successfully wrote configuration to [bold]{path}[/bold] from [bold]{email}[/bold]""" except Exception as e: _msg = f"{TIMES_MARK} {e}" print (_msg) print () -@app.command(name="register") +@app_r.command(name="add") def register (label:Annotated[str,typer.Argument(help="unique label that will be used to load the parameters of the database")], auth_file:Annotated[str,typer.Argument(help="path of the auth_file")], default:bool=typer.Option(default=False,help="set the auth_file as default"), path:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")): """ - This function create a registery of either: - an auth-file entries given an auth-file i.e database connection and assign it a label, - Learn more about auth-file at https://healthcareio.the-phi.com/data-transport + This function add a database label for a given auth-file. which allows access to the database using a label of your choice. """ try: if transport.registry.exists(path) : transport.registry.set(label=label,auth_file=auth_file, default=default, path=path) - _msg = f"""{CHECK_MARK} Successfully added label "{label}" to data-transport registry""" + _msg = f"""{CHECK_MARK} Successfully added label [bold]"{label}"[/bold] to data-transport registry""" else: _msg = f"""{TIMES_MARK} Registry is not initialized, please initialize the registry (check help)""" except Exception as e: @@ -179,7 +180,7 @@ def register_plugs ( transport.registry.plugins.init() _log = transport.registry.plugins.add(alias,path) _mark = TIMES_MARK if not _log else CHECK_MARK - _msg = f"""Could NOT add the \033[1m{alias}\033[0m to the registry""" if not _log else f""" successfully added {alias}, {len(_log)} functions added""" + _msg = f"""Could NOT add the [bold]{alias}[/bold]to the registry""" if not _log else f""" successfully added {alias}, {len(_log)} functions added""" print (f"""{_mark} {_msg}""") @app_x.command(name="list") def registry_list (): @@ -197,7 +198,7 @@ def registry_list (): @app_x.command(name="test") def registry_test (key): """ - This function allows to test syntax for a plugin i.e in terms of alis@function + This function allows to test syntax for a plugin i.e in terms of alias@function """ _item = transport.registry.plugins.has(key=key) if _item : @@ -206,8 +207,10 @@ def registry_test (key): print (pd.DataFrame([_item])) else: print (f"{TIMES_MARK} unable to load \033[1m{key}\033[0m. Make sure it is registered") - -app.add_typer(app_x, name="plugins") +app.add_typer(app_e,name='etl',help="This function will run etl or generate a template etl configuration file") +app.add_typer(app_r,name='registry',help='This function allows labeling database access information') +app.add_typer(app_i,name="info",help="This function will print either license or supported database technologies") +app.add_typer(app_x, name="plugins",help="This function enables add/list/test of plugins in the registry") if __name__ == '__main__' : app() diff --git a/info/__init__.py b/info/__init__.py index 56c7273..092b385 100644 --- a/info/__init__.py +++ b/info/__init__.py @@ -1,6 +1,6 @@ __app_name__ = 'data-transport' __author__ = 'The Phi Technology' -__version__= '2.4.12' +__version__= '2.4.14' __email__ = "info@the-phi.com" __license__=f""" Copyright 2010 - 2024, Steve L. Nyemba diff --git a/transport/sql/common.py b/transport/sql/common.py index 0b9a7e4..a6a1a0f 100644 --- a/transport/sql/common.py +++ b/transport/sql/common.py @@ -3,21 +3,17 @@ This file encapsulates common operations associated with SQL databases via SQLAl """ import sqlalchemy as sqa -from sqlalchemy import text +from sqlalchemy import text , MetaData, Table import pandas as pd - class Base: - __template__={"host":None,"port":1,"database":None,"table":None,"username":None,"password":None} def __init__(self,**_args): - # print ([' ## ',_args]) self._host = _args['host'] if 'host' in _args else 'localhost' - self._port = None if 'port' not in _args else _args['port'] + self._port = None self._database = _args['database'] self._table = _args['table'] if 'table' in _args else None self._engine= sqa.create_engine(self._get_uri(**_args),future=True) - self._chunksize = 0 if 'chunksize' not in _args else _args['chunksize'] def _set_uri(self,**_args) : """ :provider provider @@ -38,20 +34,33 @@ class Base: :table optional name of the table (can be fully qualified) """ _table = self._table if 'table' not in _args else _args['table'] + _map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'} _schema = [] - if _table : - if sqa.__version__.startswith('1.') : - _handler = sqa.MetaData(bind=self._engine) - _handler.reflect() - else: - # - # sqlalchemy's version 2.+ - _handler = sqa.MetaData() - _handler.reflect(bind=self._engine) - # - # Let us extract the schema with the native types - _map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'} - _schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns] + # if _table : + # if sqa.__version__.startswith('1.') : + # _handler = sqa.MetaData(bind=self._engine) + # _handler.reflect() + # else: + # # + # # sqlalchemy's version 2.+ + # _handler = sqa.MetaData() + # _handler.reflect(bind=self._engine) + # # + # # Let us extract the schema with the native types + # _map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'} + # _schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns] + # + + # Step 2: Create a MetaData object + metadata = MetaData() + + # Step 3: Reflect the table + table = Table(_table, metadata, autoload_with=self._engine) + + # Step 4: Print the column names and their types + _schema = [{"name":column.name,"type":_map.get(str(column.type),str(column.type))} for column in table.columns] + # _schema + # print(f"name: {column.name, Type: {column.type}") return _schema def has(self,**_args): return self.meta(**_args) @@ -62,14 +71,9 @@ class Base: @TODO: Execution of stored procedures """ - - if sql.lower().replace('\n',' ').strip().startswith('select') or sql.lower().startswith('with') : - if self._chunksize : - - return pd.read_sql(sql,self._engine,chunksize=self._chunksize) - else: - - return pd.read_sql(sql,self._engine) + if sql.lower().startswith('select') or sql.lower().startswith('with') : + + return pd.read_sql(sql,self._engine) else: _handler = self._engine.connect() _handler.execute(text(sql)) @@ -114,11 +118,9 @@ class BaseReader(SQLBase): if 'sql' in _args : sql = _args['sql'] else: - # print (dir (self)) _table = _args['table'] if 'table' in _args else self._table sql = f'SELECT * FROM {_table}' - - return self.apply(sql.replace('\n',' ').strip()) + return self.apply(sql) class BaseWriter (SQLBase): @@ -128,21 +130,23 @@ class BaseWriter (SQLBase): def __init__(self,**_args): super().__init__(**_args) def write(self,_data,**_args): - if type(_data) in [list,dict] : + if type(_data) == dict : + _df = pd.DataFrame(_data) + elif type(_data) == list : _df = pd.DataFrame(_data) - # elif type(_data) == list : - # _df = pd.DataFrame(_data) else: _df = _data.copy() # # We are assuming we have a data-frame at this point # _table = _args['table'] if 'table' in _args else self._table - _mode = {'if_exists':'append','index':False} - if self._chunksize : - _mode['chunksize'] = self._chunksize + _mode = {'chunksize':2000000,'if_exists':'append','index':False} for key in ['if_exists','index','chunksize'] : if key in _args : _mode[key] = _args[key] - + # if 'schema' in _args : + # _mode['schema'] = _args['schema'] + # if 'if_exists' in _args : + # _mode['if_exists'] = _args['if_exists'] + _df.to_sql(_table,self._engine,**_mode) \ No newline at end of file