diff --git a/bin/transport b/bin/transport index 4053c4e..39c362c 100755 --- a/bin/transport +++ b/bin/transport @@ -24,7 +24,8 @@ from multiprocessing import Process import os import transport -from transport import etl +# from transport import etl +from transport.iowrapper import IETL # from transport import providers import typer from typing_extensions import Annotated @@ -60,10 +61,13 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil _config = [_config[ int(index)]] jobs = [] for _args in _config : - pthread = etl.instance(**_args) #-- automatically starts the process + # pthread = etl.instance(**_args) #-- automatically starts the process + _worker = IETL(**_args) + pthread = Process(target=_worker.run) + pthread.start() jobs.append(pthread) # - # @TODO: Log the number of processes started and estimated time + # @TODO: Log the number of processes started and estfrom transport impfrom transport impimated time while jobs : jobs = [pthread for pthread in jobs if pthread.is_alive()] time.sleep(1) @@ -88,6 +92,7 @@ def version(): """ print (transport.__app_name__,'version ',transport.__version__) + print () print (transport.__license__) @app.command() diff --git a/transport/iowrapper.py b/transport/iowrapper.py index e3abf6c..e78fca9 100644 --- a/transport/iowrapper.py +++ b/transport/iowrapper.py @@ -103,6 +103,14 @@ class IETL(IReader) : # # If the parent is already multiprocessing self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess'] + def run(self) : + """ + We should apply the etl here, if we are in multiprocessing mode + """ + _data = super().read() + for _kwargs in self._targets : + self.post(_data,**_kwargs) + def read(self,**_args): _data = super().read(**_args) diff --git a/transport/warehouse/iceberg.py b/transport/warehouse/iceberg.py index 3083e68..4f6eca0 100644 --- a/transport/warehouse/iceberg.py +++ b/transport/warehouse/iceberg.py @@ -85,7 +85,7 @@ class Writer (Iceberg): """ def __init__(self,**_args): super().__init__(**_args) - self._mode = 'append' + self._mode = 'append' if 'mode' not in _args else _args['mode'] self._table = None if 'table' not in _args else _args['table'] def write(self,_data,**_args): _prefix = self._getPrefix(**_args) @@ -94,5 +94,10 @@ class Writer (Iceberg): rdd = self._session.createDataFrame(_data) _mode = self._mode if 'mode' not in _args else _args['mode'] _table = self._table if 'table' not in _args else _args['table'] - _table = f'{_prefix}.{_table}' - rdd.write.format('iceberg').mode(self._mode).save(_table) + + if not self.has(table=_table) : + _mode = 'overwrite' + rdd.write.format('iceberg').mode(_mode).saveAsTable(_table) + else: + _table = f'{_prefix}.{_table}' + rdd.write.format('iceberg').mode(_mode).save(_table)