You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
data-transport/transport/sql/common.py

139 lines
5.0 KiB
Python

"""
This file encapsulates common operations associated with SQL databases via SQLAlchemy
"""
import sqlalchemy as sqa
else:
_database = self._database
_provider = self.get_provider().replace(':','').replace('/','')
# _uri = [f'{_provider}:/',_account,_host,_database]
# _uri = [_item.strip() for _item in _uri if _item.strip()]
# return '/'.join(_uri)
return f'{_provider}://{_host}/{_database}' if _account == '' else f'{_provider}://{_account}{_host}/{_database}'
class BaseReader(SQLBase):
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args):
"""
This function will read a query or table from the specific database
"""
if 'sql' in _args :
sql = _args['sql']
else:
_table = _args['table'] if 'table' in _args else self._table
sql = f'SELECT * FROM {_table}'
return self.apply(sql)
class BaseWriter (SQLBase):
"""
This class implements SQLAlchemy support for Writting to a data-store (RDBMS)
"""
def __init__(self,**_args):
super().__init__(**_args)
def write(self,_data,**_args):
if type(_data) == dict :
_df = pd.DataFrame(_data)
elif type(_data) == list :
_df = pd.DataFrame(_data)
else:
_df = _data.copy()
#
# We are assuming we have a data-frame at this point
#
_table = _args['table'] if 'table' in _args else self._table
_mode = {'chunksize':2000000,'if_exists':'append','index':False}
# if 'schema' in _args :
# _mode['schema'] = _args['schema']
# if 'if_exists' in _args :
# _mode['if_exists'] = _args['if_exists']
_df.to_sql(_table,self._engine,**_mode)