""" dependency: - spark and SPARK_HOME environment variable must be set """ from pyspark.sql import SparkSession from pyspark import SparkContext import copy class Iceberg : def __init__(self,**_args): """ providing catalog meta information (you must get this from apache iceberg) """ # # Turning off logging (it's annoying & un-professional) # # _spconf = SparkContext() # _spconf.setLogLevel("ERROR") # # @TODO: # Make arrangements for additional configuration elements # self._session = SparkSession.builder.appName("data-transport").getOrCreate() # self._session.sparkContext.setLogLevel("ERROR") self._catalog = self._session.catalog self._table = _args['table'] if 'table' in _args else None if 'catalog' in _args : # # Let us set the default catalog self._catalog.setCurrentCatalog(_args['catalog']) else: # No current catalog has been set ... pass if 'database' in _args : self._database = _args['database'] self._catalog.setCurrentDatabase(self._database) else: # # Should we set the default as the first one if available ? # pass self._catalogName = self._catalog.currentCatalog() self._databaseName = self._catalog.currentDatabase() def meta (self,**_args) : """ This function should return the schema of a table (only) """ _schema = [] try: _tableName = self._getPrefix(**_args) + f".{_args['table']}" _tmp = self._session.table(_tableName).schema _schema = _tmp.jsonValue()['fields'] for _item in _schema : del _item['nullable'],_item['metadata'] except Exception as e: pass return _schema def _getPrefix (self,**_args): _catName = self._catalogName if 'catalog' not in _args else _args['catalog'] _datName = self._databaseName if 'database' not in _args else _args['database'] return '.'.join([_catName,_datName]) def apply(self,_query): """ sql query/command to run against apache iceberg """ return self._session.sql(_query) def has (self,**_args): try: _prefix = self._getPrefix(**_args) if _prefix.endswith('.') : return False return _args['table'] in [_item.name for _item in self._catalog.listTables(_prefix)] except Exception as e: print (e) return False def close(self): self._session.stop() class Reader(Iceberg) : def __init__(self,**_args): super().__init__(**_args) def read(self,**_args): _table = self._table _prefix = self._getPrefix(**_args) if 'table' in _args or _table: _table = _args['table'] if 'table' in _args else _table _table = _prefix + f'.{_table}' return self._session.table(_table).toPandas() else: sql = _args['sql'] return self._session.sql(sql).toPandas() pass class Writer (Iceberg): """ Writing data to an Apache Iceberg data warehouse (using pyspark) """ def __init__(self,**_args): super().__init__(**_args) self._mode = 'append' if 'mode' not in _args else _args['mode'] self._table = None if 'table' not in _args else _args['table'] def write(self,_data,**_args): _prefix = self._getPrefix(**_args) if 'table' not in _args and not self._table : raise Exception (f"Table Name should be specified for catalog/database {_prefix}") rdd = self._session.createDataFrame(_data,verifySchema=False) _mode = self._mode if 'mode' not in _args else _args['mode'] _table = self._table if 'table' not in _args else _args['table'] # print (_data.shape,_mode,_table) if not self._session.catalog.tableExists(_table): # # @TODO: # # add partitioning information here rdd.writeTo(_table).using('iceberg').create() # # _mode = 'overwrite' # # rdd.write.format('iceberg').mode(_mode).saveAsTable(_table) else: # rdd.writeTo(_table).append() # # _table = f'{_prefix}.{_table}' rdd.coalesce(10).write.format('iceberg').mode('append').save(_table)