added partitioning and chunking to support healthy ETL jobs or write functions

pull/1/head
Steve Nyemba 2 years ago
parent 51512c39e1
commit 245b319e7b

@ -8,7 +8,7 @@ def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read() return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = { args = {
"name":"data-transport", "name":"data-transport",
"version":"1.5.7", "version":"1.5.8",
"author":"The Phi Technology LLC","author_email":"info@the-phi.com", "author":"The Phi Technology LLC","author_email":"info@the-phi.com",
"license":"MIT", "license":"MIT",
"packages":["transport"]} "packages":["transport"]}

@ -98,15 +98,16 @@ class Console(Writer):
self.debug = self.write self.debug = self.write
self.log = self.write self.log = self.write
pass pass
def write (self,logs,**_args): def write (self,logs=None,**_args):
if self.lock : if self.lock :
Console.lock.acquire() Console.lock.acquire()
try: try:
if type(_args) == list: _params = _args if logs is None and _args else logs
for row in _args : if type(_params) == list:
for row in _params :
print (row) print (row)
else: else:
print (_args) print (_params)
except Exception as e : except Exception as e :
print (e) print (e)
finally: finally:

@ -56,20 +56,22 @@ if len(sys.argv) > 1:
class Post(Process): class Post(Process):
def __init__(self,**args): def __init__(self,**args):
super().__init__() super().__init__()
self.store = args['target']
if 'provider' not in args['target'] : if 'provider' not in args['target'] :
pass
self.PROVIDER = args['target']['type'] self.PROVIDER = args['target']['type']
self.writer = transport.factory.instance(**args['target']) # self.writer = transport.factory.instance(**args['target'])
else: else:
self.PROVIDER = args['target']['provider'] self.PROVIDER = args['target']['provider']
args['target']['context'] = 'write' self.store['context'] = 'write'
self.store = args['target'] # self.store = args['target']
self.store['lock'] = True self.store['lock'] = True
# self.writer = transport.instance(**args['target']) # self.writer = transport.instance(**args['target'])
# #
# If the table doesn't exists maybe create it ? # If the table doesn't exists maybe create it ?
# #
self.rows = args['rows'].fillna('') self.rows = args['rows']
# self.rows = args['rows'].fillna('')
def log(self,**_args) : def log(self,**_args) :
if ETL.logger : if ETL.logger :
@ -77,20 +79,7 @@ class Post(Process):
def run(self): def run(self):
_info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows
ltypes = self.rows.dtypes.values
columns = self.rows.dtypes.index.tolist()
# if not self.writer.has() :
# self.writer.make(fields=columns)
# ETL.logger.info(module='write',action='make-table',input={"name":self.writer.table})
self.log(module='write',action='make-table',input={"schema":columns})
for name in columns :
if _info[name].dtype in ['int32','int64','int','float','float32','float64'] :
value = 0
else:
value = ''
_info[name] = _info[name].fillna(value)
writer = transport.factory.instance(**self.store) writer = transport.factory.instance(**self.store)
writer.write(_info) writer.write(_info)
writer.close() writer.close()
@ -149,9 +138,11 @@ class ETL (Process):
# _id = ' '.join([str(i),' table ',self.name]) # _id = ' '.join([str(i),' table ',self.name])
indexes = rows[i] indexes = rows[i]
segment = idf.loc[indexes,:].copy() #.to_dict(orient='records') segment = idf.loc[indexes,:].copy() #.to_dict(orient='records')
_name = "partition-"+str(i)
if segment.shape[0] == 0 : if segment.shape[0] == 0 :
continue continue
proc = Post(target = self._oargs,rows = segment,name=str(i))
proc = Post(target = self._oargs,rows = segment,name=_name)
self.jobs.append(proc) self.jobs.append(proc)
proc.start() proc.start()
@ -167,17 +158,31 @@ class ETL (Process):
return len(self.jobs) == 0 return len(self.jobs) == 0
def instance(**_args): def instance(**_args):
""" """
:path ,index, id
:param _info list of objects with {source,target}` :param _info list of objects with {source,target}`
:param logger :param logger
""" """
logger = _args['logger'] if 'logger' in _args else None logger = _args['logger'] if 'logger' in _args else None
_info = _args['info'] if 'path' in _args :
_info = json.loads((open(_args['path'])).read())
if 'index' in _args :
_index = int(_args['index'])
_info = _info[_index]
elif 'id' in _args :
_info = [_item for _item in _info if '_id' in _item and _item['id'] == _args['id']]
_info = _info[0] if _info else _info
else:
_info = _args['info']
if logger and type(logger) != str: if logger and type(logger) != str:
ETL.logger = logger ETL.logger = logger
elif logger == 'console': elif logger == 'console':
ETL.logger = transport.factory.instance(provider='console',lock=True) ETL.logger = transport.factory.instance(provider='console',context='write',lock=True)
if type(_info) in [list,dict] : if type(_info) in [list,dict] :
_config = _info if type(_info) != dict else [_info] _info = _info if type(_info) != dict else [_info]
# #
# The assumption here is that the objects within the list are {source,target} # The assumption here is that the objects within the list are {source,target}
jobs = [] jobs = []
@ -185,6 +190,7 @@ def instance(**_args):
_item['jobs'] = 5 if 'procs' not in _args else int(_args['procs']) _item['jobs'] = 5 if 'procs' not in _args else int(_args['procs'])
_job = ETL(**_item) _job = ETL(**_item)
_job.start() _job.start()
jobs.append(_job) jobs.append(_job)
return jobs return jobs

@ -8,6 +8,10 @@ Permission is hereby granted, free of charge, to any person obtaining a copy of
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
@TODO:
- Migrate SQLite to SQL hierarchy
- Include Write in Chunks from pandas
""" """
import psycopg2 as pg import psycopg2 as pg
import mysql.connector as my import mysql.connector as my
@ -31,6 +35,7 @@ import os
class SQLRW : class SQLRW :
lock = RLock() lock = RLock()
MAX_CHUNK = 2000000
DRIVERS = {"postgresql":pg,"redshift":pg,"mysql":my,"mariadb":my,"netezza":nz} DRIVERS = {"postgresql":pg,"redshift":pg,"mysql":my,"mariadb":my,"netezza":nz}
REFERENCE = { REFERENCE = {
"netezza":{"port":5480,"handler":nz,"dtype":"VARCHAR(512)"}, "netezza":{"port":5480,"handler":nz,"dtype":"VARCHAR(512)"},
@ -47,6 +52,7 @@ class SQLRW :
self.table = _args['table'] if 'table' in _args else None self.table = _args['table'] if 'table' in _args else None
self.fields = _args['fields'] if 'fields' in _args else [] self.fields = _args['fields'] if 'fields' in _args else []
self.schema = _args['schema'] if 'schema' in _args else '' self.schema = _args['schema'] if 'schema' in _args else ''
self._chunks = 1 if 'chunks' not in _args else int(_args['chunks'])
self._provider = _args['provider'] if 'provider' in _args else None self._provider = _args['provider'] if 'provider' in _args else None
# _info['host'] = 'localhost' if 'host' not in _args else _args['host'] # _info['host'] = 'localhost' if 'host' not in _args else _args['host']
@ -103,6 +109,13 @@ class SQLRW :
_m = sqlalchemy.MetaData(bind=self._engine) _m = sqlalchemy.MetaData(bind=self._engine)
_m.reflect() _m.reflect()
schema = [{"name":_attr.name,"type":str(_attr.type)} for _attr in _m.tables[table].columns] schema = [{"name":_attr.name,"type":str(_attr.type)} for _attr in _m.tables[table].columns]
#
# Some house keeping work
_m = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
for _item in schema :
if _item['type'] in _m :
_item['type'] = _m[_item['type']]
except Exception as e: except Exception as e:
pass pass
return schema return schema
@ -258,14 +271,7 @@ class SQLWriter(SQLRW,Writer):
# _fields = info.keys() if type(info) == dict else info[0].keys() # _fields = info.keys() if type(info) == dict else info[0].keys()
_fields = list (_fields) _fields = list (_fields)
self.init(_fields) self.init(_fields)
#
# @TODO: Use pandas/odbc ? Not sure b/c it requires sqlalchemy
#
# if type(info) != list :
# #
# # We are assuming 2 cases i.e dict or pd.DataFrame
# info = [info] if type(info) == dict else info.values.tolist()
try: try:
table = _args['table'] if 'table' in _args else self.table table = _args['table'] if 'table' in _args else self.table
table = self._tablename(table) table = self._tablename(table)
@ -284,22 +290,36 @@ class SQLWriter(SQLRW,Writer):
return return
if self.lock : if self.lock :
SQLRW.lock.acquire() SQLRW.lock.acquire()
#
if self._engine is not None: # we will adjust the chunks here in case we are not always sure of the
# pd.to_sql(_info,self._engine) if self._chunks == 1 and _info.shape[0] > SQLRW.MAX_CHUNK :
if self.schema in ['',None] : self._chunks = 10
rows = _info.to_sql(table,self._engine,if_exists='append',index=False) _indexes = np.array_split(np.arange(_info.shape[0]),self._chunks)
else: for i in _indexes :
rows = _info.to_sql(self.table,self._engine,schema=self.schema,if_exists='append',index=False) #
# In case we have an invalid chunk ...
if _info.iloc[i].shape[0] == 0 :
continue
#
# We are enabling writing by chunks/batches because some persistent layers have quotas or limitations on volume of data
else: if self._engine is not None:
_fields = ",".join(self.fields) # pd.to_sql(_info,self._engine)
_sql = _sql.replace(":fields",_fields) if self.schema in ['',None] :
values = ", ".join("?"*len(self.fields)) if self._provider == 'netezza' else ",".join(["%s" for name in self.fields]) rows = _info.iloc[i].to_sql(table,self._engine,if_exists='append',index=False)
_sql = _sql.replace(":values",values) else:
cursor = self.conn.cursor() #
cursor.executemany(_sql,_info.values.tolist()) # Writing with schema information ...
cursor.close() rows = _info.iloc[i].to_sql(self.table,self._engine,schema=self.schema,if_exists='append',index=False)
else:
_fields = ",".join(self.fields)
_sql = _sql.replace(":fields",_fields)
values = ", ".join("?"*len(self.fields)) if self._provider == 'netezza' else ",".join(["%s" for name in self.fields])
_sql = _sql.replace(":values",values)
cursor = self.conn.cursor()
cursor.executemany(_sql,_info.iloc[i].values.tolist())
cursor.close()
# cursor.commit() # cursor.commit()
# self.conn.commit() # self.conn.commit()
@ -382,6 +402,7 @@ class BQWriter(BigQuery,Writer):
self.parallel = False if 'lock' not in _args else _args['lock'] self.parallel = False if 'lock' not in _args else _args['lock']
self.table = _args['table'] if 'table' in _args else None self.table = _args['table'] if 'table' in _args else None
self.mode = {'if_exists':'append','chunksize':900000,'destination_table':self.table,'credentials':self.credentials} self.mode = {'if_exists':'append','chunksize':900000,'destination_table':self.table,'credentials':self.credentials}
self._chunks = 1 if 'chunks' not in _args else int(_args['chunks'])
def write(self,_info,**_args) : def write(self,_info,**_args) :
try: try:
@ -409,8 +430,13 @@ class BQWriter(BigQuery,Writer):
self.mode['table_schema'] = _args['schema'] self.mode['table_schema'] = _args['schema']
# _mode = copy.deepcopy(self.mode) # _mode = copy.deepcopy(self.mode)
_mode = self.mode _mode = self.mode
_df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000) # _df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)
#
# Let us adjust the chunking here
self._chunkks = 10 if _df.shape[0] > SQLRW.MAX_CHUNK and self._chunks == 1 else self._chunks
_indexes = np.array_split(np.arange(_df.shape[0]),self._chunks)
for i in _indexes :
_df.iloc[i].to_gbq(**self.mode)
pass pass
# #
# Aliasing the big query classes allowing it to be backward compatible # Aliasing the big query classes allowing it to be backward compatible

Loading…
Cancel
Save