bug fix: issue with sqlalchemy & python 3.12

v2.4
Steve Nyemba 2 months ago
parent e82145690b
commit 6637405898

@ -33,14 +33,17 @@ from typing import Optional
import time import time
from termcolor import colored from termcolor import colored
from enum import Enum from enum import Enum
from typing import Tuple from rich import print
app = typer.Typer() app = typer.Typer()
app_x = typer.Typer() app_e = typer.Typer() #-- handles etl (run, generate)
app_x = typer.Typer() #-- handles plugins (list,add, test)
app_i = typer.Typer() #-- handles information (version, license)
app_r = typer.Typer() #-- handles registry
REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport']) REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
REGISTRY_FILE= 'transport-registry.json' REGISTRY_FILE= 'transport-registry.json'
CHECK_MARK = ' '.join(['[',colored(u'\u2713', 'green'),']']) CHECK_MARK = '[ [green]\u2713[/green] ]' #' '.join(['[',colored(u'\u2713', 'green'),']'])
TIMES_MARK= ' '.join(['[',colored(u'\u2717','red'),']']) TIMES_MARK= '[ [red]\u2717[/red] ]' #' '.join(['[',colored(u'\u2717','red'),']'])
# @app.command() # @app.command()
def help() : def help() :
print (__doc__) print (__doc__)
@ -52,7 +55,7 @@ def wait (jobs):
while jobs : while jobs :
jobs = [pthread for pthread in jobs if pthread.is_alive()] jobs = [pthread for pthread in jobs if pthread.is_alive()]
@app.command(name="etl") @app_e.command(name="run")
def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")], def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"), index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"),
batch:int = typer.Option(default=5, help="The number of parallel processes to run at once") batch:int = typer.Option(default=5, help="The number of parallel processes to run at once")
@ -89,10 +92,10 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil
# time.sleep(1) # time.sleep(1)
# #
# @TODO: Log the job termination here ... # @TODO: Log the job termination here ...
@app.command(name="providers") @app_i.command(name="supported")
def supported (format:Annotated[str,typer.Argument(help="format of the output, supported formats are (list,table,json)")]="table") : def supported (format:Annotated[str,typer.Argument(help="format of the output, supported formats are (list,table,json)")]="table") :
""" """
This function will print supported providers/vendors and their associated classifications This function will print supported database technologies
""" """
_df = (transport.supported()) _df = (transport.supported())
if format in ['list','json'] : if format in ['list','json'] :
@ -101,17 +104,17 @@ def supported (format:Annotated[str,typer.Argument(help="format of the output, s
print (_df) print (_df)
print () print ()
@app.command() @app_i.command(name="license")
def version(): def info():
""" """
This function will display version and license information This function will display version and license information
""" """
print (transport.__app_name__,'version ',transport.__version__) print (f'[bold] {transport.__app_name__} ,version {transport.__version__}[/bold]')
print () print ()
print (transport.__license__) print (transport.__license__)
@app.command() @app_e.command()
def generate (path:Annotated[str,typer.Argument(help="path of the ETL configuration file template (name included)")]): def generate (path:Annotated[str,typer.Argument(help="path of the ETL configuration file template (name included)")]):
""" """
This function will generate a configuration template to give a sense of how to create one This function will generate a configuration template to give a sense of how to create one
@ -126,12 +129,12 @@ def generate (path:Annotated[str,typer.Argument(help="path of the ETL configurat
file = open(path,'w') file = open(path,'w')
file.write(json.dumps(_config)) file.write(json.dumps(_config))
file.close() file.close()
print (f"""{CHECK_MARK} Successfully generated a template ETL file at {path}""" ) print (f"""{CHECK_MARK} Successfully generated a template ETL file at [bold]{path}[/bold]""" )
print ("""NOTE: Each line (source or target) is the content of an auth-file""") print ("""NOTE: Each line (source or target) is the content of an auth-file""")
@app.command(name="init") @app_r.command(name="reset")
def initregistry (email:Annotated[str,typer.Argument(help="email")], def initregistry (email:Annotated[str,typer.Argument(help="email")],
path:str=typer.Option(default=REGISTRY_PATH,help="path or location of the configuration file"), path:str=typer.Option(default=REGISTRY_PATH,help="path or location of the configuration file"),
override:bool=typer.Option(default=False,help="override existing configuration or not")): override:bool=typer.Option(default=False,help="override existing configuration or not")):
@ -141,26 +144,24 @@ def initregistry (email:Annotated[str,typer.Argument(help="email")],
""" """
try: try:
transport.registry.init(email=email, path=path, override=override) transport.registry.init(email=email, path=path, override=override)
_msg = f"""{CHECK_MARK} Successfully wrote configuration to {path} from {email}""" _msg = f"""{CHECK_MARK} Successfully wrote configuration to [bold]{path}[/bold] from [bold]{email}[/bold]"""
except Exception as e: except Exception as e:
_msg = f"{TIMES_MARK} {e}" _msg = f"{TIMES_MARK} {e}"
print (_msg) print (_msg)
print () print ()
@app.command(name="register") @app_r.command(name="add")
def register (label:Annotated[str,typer.Argument(help="unique label that will be used to load the parameters of the database")], def register (label:Annotated[str,typer.Argument(help="unique label that will be used to load the parameters of the database")],
auth_file:Annotated[str,typer.Argument(help="path of the auth_file")], auth_file:Annotated[str,typer.Argument(help="path of the auth_file")],
default:bool=typer.Option(default=False,help="set the auth_file as default"), default:bool=typer.Option(default=False,help="set the auth_file as default"),
path:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")): path:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")):
""" """
This function create a registery of either: This function add a database label for a given auth-file. which allows access to the database using a label of your choice.
an auth-file entries given an auth-file i.e database connection and assign it a label,
Learn more about auth-file at https://healthcareio.the-phi.com/data-transport
""" """
try: try:
if transport.registry.exists(path) : if transport.registry.exists(path) :
transport.registry.set(label=label,auth_file=auth_file, default=default, path=path) transport.registry.set(label=label,auth_file=auth_file, default=default, path=path)
_msg = f"""{CHECK_MARK} Successfully added label "{label}" to data-transport registry""" _msg = f"""{CHECK_MARK} Successfully added label [bold]"{label}"[/bold] to data-transport registry"""
else: else:
_msg = f"""{TIMES_MARK} Registry is not initialized, please initialize the registry (check help)""" _msg = f"""{TIMES_MARK} Registry is not initialized, please initialize the registry (check help)"""
except Exception as e: except Exception as e:
@ -179,7 +180,7 @@ def register_plugs (
transport.registry.plugins.init() transport.registry.plugins.init()
_log = transport.registry.plugins.add(alias,path) _log = transport.registry.plugins.add(alias,path)
_mark = TIMES_MARK if not _log else CHECK_MARK _mark = TIMES_MARK if not _log else CHECK_MARK
_msg = f"""Could NOT add the \033[1m{alias}\033[0m to the registry""" if not _log else f""" successfully added {alias}, {len(_log)} functions added""" _msg = f"""Could NOT add the [bold]{alias}[/bold]to the registry""" if not _log else f""" successfully added {alias}, {len(_log)} functions added"""
print (f"""{_mark} {_msg}""") print (f"""{_mark} {_msg}""")
@app_x.command(name="list") @app_x.command(name="list")
def registry_list (): def registry_list ():
@ -197,7 +198,7 @@ def registry_list ():
@app_x.command(name="test") @app_x.command(name="test")
def registry_test (key): def registry_test (key):
""" """
This function allows to test syntax for a plugin i.e in terms of alis@function This function allows to test syntax for a plugin i.e in terms of alias@function
""" """
_item = transport.registry.plugins.has(key=key) _item = transport.registry.plugins.has(key=key)
if _item : if _item :
@ -206,8 +207,10 @@ def registry_test (key):
print (pd.DataFrame([_item])) print (pd.DataFrame([_item]))
else: else:
print (f"{TIMES_MARK} unable to load \033[1m{key}\033[0m. Make sure it is registered") print (f"{TIMES_MARK} unable to load \033[1m{key}\033[0m. Make sure it is registered")
app.add_typer(app_e,name='etl',help="This function will run etl or generate a template etl configuration file")
app.add_typer(app_x, name="plugins") app.add_typer(app_r,name='registry',help='This function allows labeling database access information')
app.add_typer(app_i,name="info",help="This function will print either license or supported database technologies")
app.add_typer(app_x, name="plugins",help="This function enables add/list/test of plugins in the registry")
if __name__ == '__main__' : if __name__ == '__main__' :
app() app()

@ -1,6 +1,6 @@
__app_name__ = 'data-transport' __app_name__ = 'data-transport'
__author__ = 'The Phi Technology' __author__ = 'The Phi Technology'
__version__= '2.4.12' __version__= '2.4.14'
__email__ = "info@the-phi.com" __email__ = "info@the-phi.com"
__license__=f""" __license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba Copyright 2010 - 2024, Steve L. Nyemba

@ -3,21 +3,17 @@ This file encapsulates common operations associated with SQL databases via SQLAl
""" """
import sqlalchemy as sqa import sqlalchemy as sqa
from sqlalchemy import text from sqlalchemy import text , MetaData, Table
import pandas as pd import pandas as pd
class Base: class Base:
__template__={"host":None,"port":1,"database":None,"table":None,"username":None,"password":None}
def __init__(self,**_args): def __init__(self,**_args):
# print ([' ## ',_args])
self._host = _args['host'] if 'host' in _args else 'localhost' self._host = _args['host'] if 'host' in _args else 'localhost'
self._port = None if 'port' not in _args else _args['port'] self._port = None
self._database = _args['database'] self._database = _args['database']
self._table = _args['table'] if 'table' in _args else None self._table = _args['table'] if 'table' in _args else None
self._engine= sqa.create_engine(self._get_uri(**_args),future=True) self._engine= sqa.create_engine(self._get_uri(**_args),future=True)
self._chunksize = 0 if 'chunksize' not in _args else _args['chunksize']
def _set_uri(self,**_args) : def _set_uri(self,**_args) :
""" """
:provider provider :provider provider
@ -38,20 +34,33 @@ class Base:
:table optional name of the table (can be fully qualified) :table optional name of the table (can be fully qualified)
""" """
_table = self._table if 'table' not in _args else _args['table'] _table = self._table if 'table' not in _args else _args['table']
_map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
_schema = [] _schema = []
if _table : # if _table :
if sqa.__version__.startswith('1.') : # if sqa.__version__.startswith('1.') :
_handler = sqa.MetaData(bind=self._engine) # _handler = sqa.MetaData(bind=self._engine)
_handler.reflect() # _handler.reflect()
else: # else:
# # #
# sqlalchemy's version 2.+ # # sqlalchemy's version 2.+
_handler = sqa.MetaData() # _handler = sqa.MetaData()
_handler.reflect(bind=self._engine) # _handler.reflect(bind=self._engine)
# # #
# Let us extract the schema with the native types # # Let us extract the schema with the native types
_map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'} # _map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
_schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns] # _schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns]
#
# Step 2: Create a MetaData object
metadata = MetaData()
# Step 3: Reflect the table
table = Table(_table, metadata, autoload_with=self._engine)
# Step 4: Print the column names and their types
_schema = [{"name":column.name,"type":_map.get(str(column.type),str(column.type))} for column in table.columns]
# _schema
# print(f"name: {column.name, Type: {column.type}")
return _schema return _schema
def has(self,**_args): def has(self,**_args):
return self.meta(**_args) return self.meta(**_args)
@ -62,14 +71,9 @@ class Base:
@TODO: Execution of stored procedures @TODO: Execution of stored procedures
""" """
if sql.lower().startswith('select') or sql.lower().startswith('with') :
if sql.lower().replace('\n',' ').strip().startswith('select') or sql.lower().startswith('with') : return pd.read_sql(sql,self._engine)
if self._chunksize :
return pd.read_sql(sql,self._engine,chunksize=self._chunksize)
else:
return pd.read_sql(sql,self._engine)
else: else:
_handler = self._engine.connect() _handler = self._engine.connect()
_handler.execute(text(sql)) _handler.execute(text(sql))
@ -114,11 +118,9 @@ class BaseReader(SQLBase):
if 'sql' in _args : if 'sql' in _args :
sql = _args['sql'] sql = _args['sql']
else: else:
# print (dir (self))
_table = _args['table'] if 'table' in _args else self._table _table = _args['table'] if 'table' in _args else self._table
sql = f'SELECT * FROM {_table}' sql = f'SELECT * FROM {_table}'
return self.apply(sql)
return self.apply(sql.replace('\n',' ').strip())
class BaseWriter (SQLBase): class BaseWriter (SQLBase):
@ -128,21 +130,23 @@ class BaseWriter (SQLBase):
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
def write(self,_data,**_args): def write(self,_data,**_args):
if type(_data) in [list,dict] : if type(_data) == dict :
_df = pd.DataFrame(_data)
elif type(_data) == list :
_df = pd.DataFrame(_data) _df = pd.DataFrame(_data)
# elif type(_data) == list :
# _df = pd.DataFrame(_data)
else: else:
_df = _data.copy() _df = _data.copy()
# #
# We are assuming we have a data-frame at this point # We are assuming we have a data-frame at this point
# #
_table = _args['table'] if 'table' in _args else self._table _table = _args['table'] if 'table' in _args else self._table
_mode = {'if_exists':'append','index':False} _mode = {'chunksize':2000000,'if_exists':'append','index':False}
if self._chunksize :
_mode['chunksize'] = self._chunksize
for key in ['if_exists','index','chunksize'] : for key in ['if_exists','index','chunksize'] :
if key in _args : if key in _args :
_mode[key] = _args[key] _mode[key] = _args[key]
# if 'schema' in _args :
# _mode['schema'] = _args['schema']
# if 'if_exists' in _args :
# _mode['if_exists'] = _args['if_exists']
_df.to_sql(_table,self._engine,**_mode) _df.to_sql(_table,self._engine,**_mode)
Loading…
Cancel
Save