""" dependency: - spark and SPARK_HOME environment variable must be set """ from pyspark.sql import SparkSession from pyspark import SparkContext import copy class Iceberg : def __init__(self,**_args): """ providing catalog meta information (you must get this from apache iceberg) """ # # Turning off logging (it's annoying & un-professional) # # _spconf = SparkContext() # _spconf.setLogLevel("ERROR") # # @TODO: # Make arrangements for additional configuration elements # self._session = SparkSession.builder.getOrCreate() # self._session.sparkContext.setLogLevel("ERROR") self._catalog = self._session.catalog self._table = _args['table'] if 'table' in _args else None if 'catalog' in _args : # # Let us set the default catalog self._catalog.setCurrentCatalog(_args['catalog']) else: # No current catalog has been set ... pass if 'database' in _args : self._database = _args['database'] self._catalog.setCurrentDatabase(self._database) else: # # Should we set the default as the first one if available ? # pass self._catalogName = self._catalog.currentCatalog() self._databaseName = self._catalog.currentDatabase() def meta (self,**_args) : """ This function should return the schema of a table (only) """ _schema = [] try: _tableName = self._getPrefix(**_args) + f".{_args['table']}" print (_tableName) _tmp = self._session.table(_tableName).schema _schema = _tmp.jsonValue()['fields'] for _item in _schema : del _item['nullable'],_item['metadata'] except Exception as e: pass return _schema def _getPrefix (self,**_args): _catName = self._catalogName if 'catalog' not in _args else _args['catalog'] _datName = self._databaseName if 'database' not in _args else _args['database'] return '.'.join([_catName,_datName]) def has (self,**_args): try: _prefix = self._getPrefix(**_args) if _prefix.endswith('.') : return False return _args['table'] in [_item.name for _item in self._catalog.listTables(_prefix)] except Exception as e: print (e) return False def apply(self,sql): pass class Reader(Iceberg) : def __init__(self,**_args): super().__init__(**_args) def read(self,**_args): _table = self._table _prefix = self._getPrefix(**_args) if 'table' in _args or _table: _table = _args['table'] if 'table' in _args else _table _table = _prefix + f'.{_table}' return self._session.table(_table).toPandas() else: sql = _args['sql'] return self._session.sql(sql).toPandas() pass class Writer (Iceberg): """ Writing data to an Apache Iceberg data warehouse (using pyspark) """ def __init__(self,**_args): super().__init__(**_args) self._mode = 'append' if 'mode' not in _args else _args['mode'] self._table = None if 'table' not in _args else _args['table'] def write(self,_data,**_args): _prefix = self._getPrefix(**_args) if 'table' not in _args and not self._table : raise Exception (f"Table Name should be specified for catalog/database {_prefix}") rdd = self._session.createDataFrame(_data) _mode = self._mode if 'mode' not in _args else _args['mode'] _table = self._table if 'table' not in _args else _args['table'] if not self.has(table=_table) : _mode = 'overwrite' rdd.write.format('iceberg').mode(_mode).saveAsTable(_table) else: _table = f'{_prefix}.{_table}' rdd.write.format('iceberg').mode(_mode).save(_table)