From 245b319e7b8b0b80234f2a2f3549a026a1c9468a Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Wed, 3 Aug 2022 12:07:27 -0500 Subject: [PATCH] added partitioning and chunking to support healthy ETL jobs or write functions --- setup.py | 2 +- transport/common.py | 9 +++--- transport/etl.py | 52 +++++++++++++++++-------------- transport/sql.py | 76 ++++++++++++++++++++++++++++++--------------- 4 files changed, 86 insertions(+), 53 deletions(-) diff --git a/setup.py b/setup.py index 2983f70..aecb441 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = { "name":"data-transport", - "version":"1.5.7", + "version":"1.5.8", "author":"The Phi Technology LLC","author_email":"info@the-phi.com", "license":"MIT", "packages":["transport"]} diff --git a/transport/common.py b/transport/common.py index 8234290..d9a3fab 100644 --- a/transport/common.py +++ b/transport/common.py @@ -98,15 +98,16 @@ class Console(Writer): self.debug = self.write self.log = self.write pass - def write (self,logs,**_args): + def write (self,logs=None,**_args): if self.lock : Console.lock.acquire() try: - if type(_args) == list: - for row in _args : + _params = _args if logs is None and _args else logs + if type(_params) == list: + for row in _params : print (row) else: - print (_args) + print (_params) except Exception as e : print (e) finally: diff --git a/transport/etl.py b/transport/etl.py index 6783cc6..83c6147 100644 --- a/transport/etl.py +++ b/transport/etl.py @@ -56,20 +56,22 @@ if len(sys.argv) > 1: class Post(Process): def __init__(self,**args): super().__init__() - + self.store = args['target'] if 'provider' not in args['target'] : + pass self.PROVIDER = args['target']['type'] - self.writer = transport.factory.instance(**args['target']) + # self.writer = transport.factory.instance(**args['target']) else: self.PROVIDER = args['target']['provider'] - args['target']['context'] = 'write' - self.store = args['target'] + self.store['context'] = 'write' + # self.store = args['target'] self.store['lock'] = True # self.writer = transport.instance(**args['target']) # # If the table doesn't exists maybe create it ? # - self.rows = args['rows'].fillna('') + self.rows = args['rows'] + # self.rows = args['rows'].fillna('') def log(self,**_args) : if ETL.logger : @@ -77,20 +79,7 @@ class Post(Process): def run(self): _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows - ltypes = self.rows.dtypes.values - columns = self.rows.dtypes.index.tolist() - # if not self.writer.has() : - - - # self.writer.make(fields=columns) - # ETL.logger.info(module='write',action='make-table',input={"name":self.writer.table}) - self.log(module='write',action='make-table',input={"schema":columns}) - for name in columns : - if _info[name].dtype in ['int32','int64','int','float','float32','float64'] : - value = 0 - else: - value = '' - _info[name] = _info[name].fillna(value) + writer = transport.factory.instance(**self.store) writer.write(_info) writer.close() @@ -149,9 +138,11 @@ class ETL (Process): # _id = ' '.join([str(i),' table ',self.name]) indexes = rows[i] segment = idf.loc[indexes,:].copy() #.to_dict(orient='records') + _name = "partition-"+str(i) if segment.shape[0] == 0 : continue - proc = Post(target = self._oargs,rows = segment,name=str(i)) + + proc = Post(target = self._oargs,rows = segment,name=_name) self.jobs.append(proc) proc.start() @@ -167,17 +158,31 @@ class ETL (Process): return len(self.jobs) == 0 def instance(**_args): """ + :path ,index, id :param _info list of objects with {source,target}` :param logger """ logger = _args['logger'] if 'logger' in _args else None - _info = _args['info'] + if 'path' in _args : + _info = json.loads((open(_args['path'])).read()) + + + if 'index' in _args : + _index = int(_args['index']) + _info = _info[_index] + + elif 'id' in _args : + _info = [_item for _item in _info if '_id' in _item and _item['id'] == _args['id']] + _info = _info[0] if _info else _info + else: + _info = _args['info'] + if logger and type(logger) != str: ETL.logger = logger elif logger == 'console': - ETL.logger = transport.factory.instance(provider='console',lock=True) + ETL.logger = transport.factory.instance(provider='console',context='write',lock=True) if type(_info) in [list,dict] : - _config = _info if type(_info) != dict else [_info] + _info = _info if type(_info) != dict else [_info] # # The assumption here is that the objects within the list are {source,target} jobs = [] @@ -185,6 +190,7 @@ def instance(**_args): _item['jobs'] = 5 if 'procs' not in _args else int(_args['procs']) _job = ETL(**_item) + _job.start() jobs.append(_job) return jobs diff --git a/transport/sql.py b/transport/sql.py index 50ba1ed..08f3648 100644 --- a/transport/sql.py +++ b/transport/sql.py @@ -8,6 +8,10 @@ Permission is hereby granted, free of charge, to any person obtaining a copy of The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +@TODO: + - Migrate SQLite to SQL hierarchy + - Include Write in Chunks from pandas """ import psycopg2 as pg import mysql.connector as my @@ -31,6 +35,7 @@ import os class SQLRW : lock = RLock() + MAX_CHUNK = 2000000 DRIVERS = {"postgresql":pg,"redshift":pg,"mysql":my,"mariadb":my,"netezza":nz} REFERENCE = { "netezza":{"port":5480,"handler":nz,"dtype":"VARCHAR(512)"}, @@ -47,6 +52,7 @@ class SQLRW : self.table = _args['table'] if 'table' in _args else None self.fields = _args['fields'] if 'fields' in _args else [] self.schema = _args['schema'] if 'schema' in _args else '' + self._chunks = 1 if 'chunks' not in _args else int(_args['chunks']) self._provider = _args['provider'] if 'provider' in _args else None # _info['host'] = 'localhost' if 'host' not in _args else _args['host'] @@ -103,6 +109,13 @@ class SQLRW : _m = sqlalchemy.MetaData(bind=self._engine) _m.reflect() schema = [{"name":_attr.name,"type":str(_attr.type)} for _attr in _m.tables[table].columns] + # + # Some house keeping work + _m = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'} + for _item in schema : + if _item['type'] in _m : + _item['type'] = _m[_item['type']] + except Exception as e: pass return schema @@ -258,14 +271,7 @@ class SQLWriter(SQLRW,Writer): # _fields = info.keys() if type(info) == dict else info[0].keys() _fields = list (_fields) self.init(_fields) - # - # @TODO: Use pandas/odbc ? Not sure b/c it requires sqlalchemy - # - # if type(info) != list : - # # - # # We are assuming 2 cases i.e dict or pd.DataFrame - # info = [info] if type(info) == dict else info.values.tolist() - + try: table = _args['table'] if 'table' in _args else self.table table = self._tablename(table) @@ -284,22 +290,36 @@ class SQLWriter(SQLRW,Writer): return if self.lock : SQLRW.lock.acquire() - - if self._engine is not None: - # pd.to_sql(_info,self._engine) - if self.schema in ['',None] : - rows = _info.to_sql(table,self._engine,if_exists='append',index=False) - else: - rows = _info.to_sql(self.table,self._engine,schema=self.schema,if_exists='append',index=False) + # + # we will adjust the chunks here in case we are not always sure of the + if self._chunks == 1 and _info.shape[0] > SQLRW.MAX_CHUNK : + self._chunks = 10 + _indexes = np.array_split(np.arange(_info.shape[0]),self._chunks) + for i in _indexes : + # + # In case we have an invalid chunk ... + if _info.iloc[i].shape[0] == 0 : + continue + # + # We are enabling writing by chunks/batches because some persistent layers have quotas or limitations on volume of data - else: - _fields = ",".join(self.fields) - _sql = _sql.replace(":fields",_fields) - values = ", ".join("?"*len(self.fields)) if self._provider == 'netezza' else ",".join(["%s" for name in self.fields]) - _sql = _sql.replace(":values",values) - cursor = self.conn.cursor() - cursor.executemany(_sql,_info.values.tolist()) - cursor.close() + if self._engine is not None: + # pd.to_sql(_info,self._engine) + if self.schema in ['',None] : + rows = _info.iloc[i].to_sql(table,self._engine,if_exists='append',index=False) + else: + # + # Writing with schema information ... + rows = _info.iloc[i].to_sql(self.table,self._engine,schema=self.schema,if_exists='append',index=False) + + else: + _fields = ",".join(self.fields) + _sql = _sql.replace(":fields",_fields) + values = ", ".join("?"*len(self.fields)) if self._provider == 'netezza' else ",".join(["%s" for name in self.fields]) + _sql = _sql.replace(":values",values) + cursor = self.conn.cursor() + cursor.executemany(_sql,_info.iloc[i].values.tolist()) + cursor.close() # cursor.commit() # self.conn.commit() @@ -382,6 +402,7 @@ class BQWriter(BigQuery,Writer): self.parallel = False if 'lock' not in _args else _args['lock'] self.table = _args['table'] if 'table' in _args else None self.mode = {'if_exists':'append','chunksize':900000,'destination_table':self.table,'credentials':self.credentials} + self._chunks = 1 if 'chunks' not in _args else int(_args['chunks']) def write(self,_info,**_args) : try: @@ -409,8 +430,13 @@ class BQWriter(BigQuery,Writer): self.mode['table_schema'] = _args['schema'] # _mode = copy.deepcopy(self.mode) _mode = self.mode - _df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000) - + # _df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000) + # + # Let us adjust the chunking here + self._chunkks = 10 if _df.shape[0] > SQLRW.MAX_CHUNK and self._chunks == 1 else self._chunks + _indexes = np.array_split(np.arange(_df.shape[0]),self._chunks) + for i in _indexes : + _df.iloc[i].to_gbq(**self.mode) pass # # Aliasing the big query classes allowing it to be backward compatible