parent
							
								
									d0e655e7e3
								
							
						
					
					
						commit
						2a72de4cd6
					
				@ -0,0 +1,7 @@
 | 
				
			||||
"""
 | 
				
			||||
This namespace/package is intended to handle read/writes against data warehouse solutions like :
 | 
				
			||||
    - apache iceberg
 | 
				
			||||
    - clickhouse (...)
 | 
				
			||||
"""
 | 
				
			||||
 | 
				
			||||
from . import iceberg, drill
 | 
				
			||||
@ -0,0 +1,55 @@
 | 
				
			||||
import sqlalchemy
 | 
				
			||||
import pandas as pd
 | 
				
			||||
from .. sql.common import BaseReader , BaseWriter
 | 
				
			||||
import sqlalchemy as sqa
 | 
				
			||||
 | 
				
			||||
class Drill :
 | 
				
			||||
    __template = {'host':None,'port':None,'ssl':None,'table':None,'database':None}
 | 
				
			||||
    def __init__(self,**_args):
 | 
				
			||||
 | 
				
			||||
        self._host = _args['host'] if 'host' in _args else 'localhost'
 | 
				
			||||
        self._port = _args['port'] if 'port' in _args else self.get_default_port()
 | 
				
			||||
        self._ssl = False if 'ssl' not in _args else _args['ssl']
 | 
				
			||||
        
 | 
				
			||||
        self._table = _args['table'] if 'table' in _args else None
 | 
				
			||||
        if self._table and '.' in self._table :
 | 
				
			||||
            _seg = self._table.split('.')
 | 
				
			||||
            if len(_seg) > 2 :
 | 
				
			||||
                self._schema,self._database = _seg[:2]
 | 
				
			||||
        else:
 | 
				
			||||
            
 | 
				
			||||
            self._database=_args['database']
 | 
				
			||||
            self._schema = self._database.split('.')[0]
 | 
				
			||||
        
 | 
				
			||||
    def _get_uri(self,**_args):
 | 
				
			||||
        return f'drill+sadrill://{self._host}:{self._port}/{self._database}?use_ssl={self._ssl}'
 | 
				
			||||
    def get_provider(self):
 | 
				
			||||
        return "drill+sadrill"
 | 
				
			||||
    def get_default_port(self):
 | 
				
			||||
        return "8047"
 | 
				
			||||
    def meta(self,**_args):
 | 
				
			||||
        _table = _args['table'] if 'table' in _args else self._table
 | 
				
			||||
        if '.' in _table :
 | 
				
			||||
            _schema = _table.split('.')[:2]
 | 
				
			||||
            _schema = '.'.join(_schema)
 | 
				
			||||
            _table = _table.split('.')[-1]
 | 
				
			||||
        else:
 | 
				
			||||
            _schema = self._schema
 | 
				
			||||
        
 | 
				
			||||
        # _sql = f"select COLUMN_NAME AS name, CASE WHEN DATA_TYPE ='CHARACTER VARYING' THEN 'CHAR ( 125 )' ELSE DATA_TYPE END AS type from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='{_schema}' and TABLE_NAME='{_table}'"
 | 
				
			||||
        _sql = f"select COLUMN_NAME AS name, CASE WHEN DATA_TYPE ='CHARACTER VARYING' THEN 'CHAR ( '||COLUMN_SIZE||' )' ELSE DATA_TYPE END AS type from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='{_schema}' and TABLE_NAME='{_table}'"
 | 
				
			||||
        try:
 | 
				
			||||
            _df  = pd.read_sql(_sql,self._engine)
 | 
				
			||||
            return _df.to_dict(orient='records')
 | 
				
			||||
        except Exception as e:
 | 
				
			||||
            print (e)
 | 
				
			||||
            pass
 | 
				
			||||
        return []
 | 
				
			||||
class Reader (Drill,BaseReader) :
 | 
				
			||||
    def __init__(self,**_args):
 | 
				
			||||
        super().__init__(**_args)
 | 
				
			||||
        self._chunksize = 0 if 'chunksize' not in _args else _args['chunksize']
 | 
				
			||||
        self._engine= sqa.create_engine(self._get_uri(),future=True)
 | 
				
			||||
class Writer(Drill,BaseWriter):
 | 
				
			||||
    def __init__(self,**_args):
 | 
				
			||||
        super().__init__(self,**_args)
 | 
				
			||||
@ -0,0 +1,151 @@
 | 
				
			||||
"""
 | 
				
			||||
dependency:
 | 
				
			||||
    - spark and SPARK_HOME environment variable must be set
 | 
				
			||||
NOTE:
 | 
				
			||||
    When using streaming option, insure that it is inline with default (1000 rows) or increase it in spark-defaults.conf
 | 
				
			||||
 | 
				
			||||
"""
 | 
				
			||||
from pyspark.sql import SparkSession
 | 
				
			||||
from pyspark import SparkContext
 | 
				
			||||
from pyspark.sql.types import *
 | 
				
			||||
from pyspark.sql.functions import col, to_date, to_timestamp
 | 
				
			||||
import copy
 | 
				
			||||
 | 
				
			||||
class Iceberg :
 | 
				
			||||
    def __init__(self,**_args):
 | 
				
			||||
        """
 | 
				
			||||
        providing catalog meta information (you must get this from apache iceberg)
 | 
				
			||||
        """
 | 
				
			||||
        #
 | 
				
			||||
        # Turning off logging (it's annoying & un-professional)
 | 
				
			||||
        #
 | 
				
			||||
        # _spconf = SparkContext()
 | 
				
			||||
        # _spconf.setLogLevel("ERROR")
 | 
				
			||||
        #
 | 
				
			||||
        # @TODO:
 | 
				
			||||
        #   Make arrangements for additional configuration elements 
 | 
				
			||||
        #
 | 
				
			||||
        self._session = SparkSession.builder.appName("data-transport").getOrCreate()
 | 
				
			||||
        self._session.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
 | 
				
			||||
        # self._session.sparkContext.setLogLevel("ERROR")
 | 
				
			||||
        self._catalog = self._session.catalog
 | 
				
			||||
        self._table = _args['table'] if 'table' in _args else None
 | 
				
			||||
        
 | 
				
			||||
        if 'catalog' in _args :
 | 
				
			||||
            #
 | 
				
			||||
            # Let us set the default catalog
 | 
				
			||||
            self._catalog.setCurrentCatalog(_args['catalog'])
 | 
				
			||||
            
 | 
				
			||||
        else:
 | 
				
			||||
            # No current catalog has been set ...
 | 
				
			||||
            pass
 | 
				
			||||
        if 'database' in _args :
 | 
				
			||||
            self._database = _args['database']
 | 
				
			||||
            self._catalog.setCurrentDatabase(self._database)
 | 
				
			||||
        else:
 | 
				
			||||
            #
 | 
				
			||||
            # Should we set the default as the first one if available ?
 | 
				
			||||
            #
 | 
				
			||||
            pass
 | 
				
			||||
        self._catalogName = self._catalog.currentCatalog()
 | 
				
			||||
        self._databaseName = self._catalog.currentDatabase()
 | 
				
			||||
    def meta (self,**_args) :
 | 
				
			||||
        """
 | 
				
			||||
        This function should return the schema of a table (only)
 | 
				
			||||
        """
 | 
				
			||||
        _schema = []
 | 
				
			||||
        try:
 | 
				
			||||
            _table = _args['table'] if 'table' in _args else self._table
 | 
				
			||||
            _tableName = self._getPrefix(**_args) + f".{_table}"
 | 
				
			||||
            _tmp = self._session.table(_tableName).schema
 | 
				
			||||
            _schema = _tmp.jsonValue()['fields']
 | 
				
			||||
            for _item in _schema :
 | 
				
			||||
                del _item['nullable'],_item['metadata']
 | 
				
			||||
        except Exception as e:
 | 
				
			||||
            
 | 
				
			||||
            pass
 | 
				
			||||
        return _schema
 | 
				
			||||
    def _getPrefix (self,**_args):        
 | 
				
			||||
        _catName = self._catalogName if 'catalog' not in _args else _args['catalog']
 | 
				
			||||
        _datName = self._databaseName if 'database' not in _args else _args['database']
 | 
				
			||||
        
 | 
				
			||||
        return '.'.join([_catName,_datName])
 | 
				
			||||
    def apply(self,_query):
 | 
				
			||||
        """
 | 
				
			||||
        sql query/command to run against apache iceberg
 | 
				
			||||
        """
 | 
				
			||||
        return self._session.sql(_query)
 | 
				
			||||
    def has (self,**_args):
 | 
				
			||||
        try:
 | 
				
			||||
            _prefix = self._getPrefix(**_args)
 | 
				
			||||
            if _prefix.endswith('.') :
 | 
				
			||||
                return False
 | 
				
			||||
            return _args['table'] in [_item.name for _item in self._catalog.listTables(_prefix)]
 | 
				
			||||
        except Exception as e:
 | 
				
			||||
            print (e)
 | 
				
			||||
            return False
 | 
				
			||||
    
 | 
				
			||||
    def close(self):
 | 
				
			||||
        self._session.stop()
 | 
				
			||||
class Reader(Iceberg) :
 | 
				
			||||
    def __init__(self,**_args):
 | 
				
			||||
        super().__init__(**_args)
 | 
				
			||||
    def read(self,**_args):
 | 
				
			||||
        _table = self._table
 | 
				
			||||
        _prefix = self._getPrefix(**_args)        
 | 
				
			||||
        if 'table' in _args or _table:
 | 
				
			||||
            _table = _args['table'] if 'table' in _args else _table
 | 
				
			||||
            _table = _prefix + f'.{_table}'
 | 
				
			||||
            return self._session.table(_table).toPandas()
 | 
				
			||||
        else:
 | 
				
			||||
            sql = _args['sql']
 | 
				
			||||
            return self._session.sql(sql).toPandas()
 | 
				
			||||
        pass
 | 
				
			||||
class Writer (Iceberg):
 | 
				
			||||
    """
 | 
				
			||||
    Writing data to an Apache Iceberg data warehouse (using pyspark)
 | 
				
			||||
    """
 | 
				
			||||
    def __init__(self,**_args):
 | 
				
			||||
        super().__init__(**_args)
 | 
				
			||||
        self._mode = 'append' if 'mode' not in _args else _args['mode']
 | 
				
			||||
        self._table = None if 'table' not in _args else _args['table']
 | 
				
			||||
    def format (self,_schema) :
 | 
				
			||||
        _iceSchema = StructType([])
 | 
				
			||||
        _map = {'integer':IntegerType(),'float':DoubleType(),'double':DoubleType(),'date':DateType(),
 | 
				
			||||
                'timestamp':TimestampType(),'datetime':TimestampType(),'string':StringType(),'varchar':StringType()}
 | 
				
			||||
        for _item in _schema :
 | 
				
			||||
            _name = _item['name']
 | 
				
			||||
            _type = _item['type'].lower()
 | 
				
			||||
            if _type not in _map :
 | 
				
			||||
                _iceType = StringType()
 | 
				
			||||
            else:
 | 
				
			||||
                _iceType = _map[_type]
 | 
				
			||||
            
 | 
				
			||||
            _iceSchema.add (StructField(_name,_iceType,True))
 | 
				
			||||
        return _iceSchema if len(_iceSchema) else []
 | 
				
			||||
    def write(self,_data,**_args):
 | 
				
			||||
        _prefix = self._getPrefix(**_args)
 | 
				
			||||
        if 'table' not in _args and not self._table :
 | 
				
			||||
            raise Exception (f"Table Name should be specified for catalog/database {_prefix}")
 | 
				
			||||
        _schema = self.format(_args['schema']) if 'schema' in _args else []
 | 
				
			||||
        if not _schema :
 | 
				
			||||
            rdd = self._session.createDataFrame(_data,verifySchema=False)
 | 
				
			||||
        else :
 | 
				
			||||
            rdd = self._session.createDataFrame(_data,schema=_schema,verifySchema=True)
 | 
				
			||||
        _mode = self._mode if 'mode' not in _args else _args['mode']
 | 
				
			||||
        _table = self._table if 'table' not in _args else _args['table']
 | 
				
			||||
        
 | 
				
			||||
        # print (_data.shape,_mode,_table)
 | 
				
			||||
        
 | 
				
			||||
        if not self._session.catalog.tableExists(_table):
 | 
				
			||||
        #     # @TODO:
 | 
				
			||||
        #     # add partitioning information here 
 | 
				
			||||
            rdd.writeTo(_table).using('iceberg').create()
 | 
				
			||||
            
 | 
				
			||||
        # #     _mode = 'overwrite'
 | 
				
			||||
        # #     rdd.write.format('iceberg').mode(_mode).saveAsTable(_table)
 | 
				
			||||
        else:
 | 
				
			||||
            # rdd.writeTo(_table).append()
 | 
				
			||||
        # #     _table = f'{_prefix}.{_table}'
 | 
				
			||||
 | 
				
			||||
            rdd.coalesce(10).write.format('iceberg').mode('append').save(_table)
 | 
				
			||||
					Loading…
					
					
				
		Reference in new issue