Compare commits
38 Commits
Author | SHA1 | Date |
---|---|---|
|
b461ce9d7b | 3 days ago |
![]() |
fbdb4a4931 | 3 days ago |
![]() |
6e1c420952 | 1 week ago |
![]() |
66d881fdda | 1 week ago |
|
6c26588462 | 2 weeks ago |
![]() |
de4e065ca6 | 2 weeks ago |
![]() |
e035f5eba0 | 2 weeks ago |
![]() |
6f8019f582 | 3 weeks ago |
|
d3517a5720 | 4 weeks ago |
![]() |
b0cd0b85dc | 2 months ago |
|
4c98e81c14 | 3 months ago |
![]() |
4b34c746ae | 3 months ago |
![]() |
0977ad1b18 | 4 months ago |
![]() |
98ef8a848e | 4 months ago |
![]() |
469c6f89a2 | 4 months ago |
![]() |
dd10f6db78 | 4 months ago |
![]() |
dad2956a8c | 4 months ago |
![]() |
eaa2b99a2d | 4 months ago |
![]() |
a1b5f2743c | 5 months ago |
![]() |
afa442ea8d | 5 months ago |
![]() |
30645e46bd | 5 months ago |
![]() |
cdf783143e | 5 months ago |
![]() |
1a8112f152 | 5 months ago |
![]() |
49ebd4a432 | 5 months ago |
![]() |
c3627586b3 | 6 months ago |
![]() |
2a72de4cd6 | 6 months ago |
![]() |
d0e655e7e3 | 8 months ago |
|
492dc8f374 | 10 months ago |
|
e848367378 | 10 months ago |
|
c872ba8cc2 | 10 months ago |
|
baa8164f16 | 1 year ago |
|
31556ebd32 | 1 year ago |
|
1e7839198a | 1 year ago |
|
dce50a967e | 1 year ago |
|
5ccb073865 | 1 year ago |
|
3081fb98e7 | 1 year ago |
|
58959359ad | 1 year ago |
|
68b8f6af5f | 1 year ago |
@ -0,0 +1,138 @@
|
|||||||
|
{
|
||||||
|
"cells": [
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"#### Writing to Apache Iceberg\n",
|
||||||
|
"\n",
|
||||||
|
"1. Insure you have a Google Bigquery service account key on disk\n",
|
||||||
|
"2. The service key location is set as an environment variable **BQ_KEY**\n",
|
||||||
|
"3. The dataset will be automatically created within the project associated with the service key\n",
|
||||||
|
"\n",
|
||||||
|
"The cell below creates a dataframe that will be stored within Google Bigquery"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 15,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"name": "stdout",
|
||||||
|
"output_type": "stream",
|
||||||
|
"text": [
|
||||||
|
"['data transport version ', '2.4.0']\n"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"#\n",
|
||||||
|
"# Writing to Google Bigquery database\n",
|
||||||
|
"#\n",
|
||||||
|
"import transport\n",
|
||||||
|
"from transport import providers\n",
|
||||||
|
"import pandas as pd\n",
|
||||||
|
"import os\n",
|
||||||
|
"\n",
|
||||||
|
"PRIVATE_KEY = os.environ['BQ_KEY'] #-- location of the service key\n",
|
||||||
|
"DATASET = 'demo'\n",
|
||||||
|
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
|
||||||
|
"# bqw = transport.get.writer(provider=providers.ICEBERG,catalog='mz',database='edw.mz',table='friends')\n",
|
||||||
|
"bqw = transport.get.writer(provider=providers.ICEBERG,table='edw.mz.friends')\n",
|
||||||
|
"bqw.write(_data,if_exists='replace') #-- default is append\n",
|
||||||
|
"print (['data transport version ', transport.__version__])\n"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"#### Reading from Google Bigquery\n",
|
||||||
|
"\n",
|
||||||
|
"The cell below reads the data that has been written by the cell above and computes the average age within a Google Bigquery (simple query). \n",
|
||||||
|
"\n",
|
||||||
|
"- Basic read of the designated table (friends) created above\n",
|
||||||
|
"- Execute an aggregate SQL against the table\n",
|
||||||
|
"\n",
|
||||||
|
"**NOTE**\n",
|
||||||
|
"\n",
|
||||||
|
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
|
||||||
|
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 14,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"name": "stdout",
|
||||||
|
"output_type": "stream",
|
||||||
|
"text": [
|
||||||
|
" name age\n",
|
||||||
|
"0 James Bond 55\n",
|
||||||
|
"1 Steve Rogers 150\n",
|
||||||
|
"2 Steve Nyemba 44\n",
|
||||||
|
"--------- STATISTICS ------------\n"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"\n",
|
||||||
|
"import transport\n",
|
||||||
|
"from transport import providers\n",
|
||||||
|
"import os\n",
|
||||||
|
"PRIVATE_KEY=os.environ['BQ_KEY']\n",
|
||||||
|
"pgr = transport.get.reader(provider=providers.ICEBERG,database='edw.mz')\n",
|
||||||
|
"_df = pgr.read(table='friends')\n",
|
||||||
|
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
|
||||||
|
"_sdf = pgr.read(sql=_query)\n",
|
||||||
|
"print (_df)\n",
|
||||||
|
"print ('--------- STATISTICS ------------')\n",
|
||||||
|
"# print (_sdf)"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"An **auth-file** is a file that contains database parameters used to access the database. \n",
|
||||||
|
"For code in shared environments, we recommend \n",
|
||||||
|
"\n",
|
||||||
|
"1. Having the **auth-file** stored on disk \n",
|
||||||
|
"2. and the location of the file is set to an environment variable.\n",
|
||||||
|
"\n",
|
||||||
|
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": []
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"metadata": {
|
||||||
|
"kernelspec": {
|
||||||
|
"display_name": "Python 3",
|
||||||
|
"language": "python",
|
||||||
|
"name": "python3"
|
||||||
|
},
|
||||||
|
"language_info": {
|
||||||
|
"codemirror_mode": {
|
||||||
|
"name": "ipython",
|
||||||
|
"version": 3
|
||||||
|
},
|
||||||
|
"file_extension": ".py",
|
||||||
|
"mimetype": "text/x-python",
|
||||||
|
"name": "python",
|
||||||
|
"nbconvert_exporter": "python",
|
||||||
|
"pygments_lexer": "ipython3",
|
||||||
|
"version": "3.9.7"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nbformat": 4,
|
||||||
|
"nbformat_minor": 2
|
||||||
|
}
|
@ -0,0 +1,62 @@
|
|||||||
|
[build-system]
|
||||||
|
requires = ["setuptools>=61.0", "wheel"]
|
||||||
|
build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
|
[project]
|
||||||
|
name = "data-transport"
|
||||||
|
dynamic = ["version"]
|
||||||
|
authors = [
|
||||||
|
{name="Steve L. Nyemba" , email = "info@the-phi.com"},
|
||||||
|
]
|
||||||
|
description = ""
|
||||||
|
readme = "README.md"
|
||||||
|
license = {text = "LICENSE"}
|
||||||
|
keywords = ["mongodb","duckdb","couchdb","rabbitmq","file","read","write","s3","sqlite"]
|
||||||
|
classifiers = [
|
||||||
|
"License :: OSI Approved :: MIT License",
|
||||||
|
"Topic :: Utilities",
|
||||||
|
]
|
||||||
|
dependencies = [
|
||||||
|
"termcolor","sqlalchemy", "aiosqlite","duckdb-engine",
|
||||||
|
"typer","pandas","numpy","sqlalchemy","pyarrow",
|
||||||
|
"plugin-ix@git+https://github.com/lnyemba/plugins-ix"
|
||||||
|
]
|
||||||
|
[project.optional-dependencies]
|
||||||
|
sql = ["mysql-connector-python","psycopg2-binary","nzpy","pymssql","duckdb-engine","aiosqlite"]
|
||||||
|
nosql = ["pymongo","cloudant"]
|
||||||
|
cloud = ["pandas-gbq","google-cloud-bigquery","google-cloud-bigquery-storage", "databricks-sqlalchemy","pyncclient","boto3","boto","botocore"]
|
||||||
|
warehouse = ["pydrill","pyspark","sqlalchemy_drill"]
|
||||||
|
rabbitmq = ["pika"]
|
||||||
|
sqlite = ["aiosqlite"]
|
||||||
|
aws3 = ["boto3","boto","botocore"]
|
||||||
|
nextcloud = ["pyncclient"]
|
||||||
|
mongodb = ["pymongo"]
|
||||||
|
netezza = ["nzpy"]
|
||||||
|
mysql = ["mysql-connector-python"]
|
||||||
|
postgresql = ["psycopg2-binary"]
|
||||||
|
sqlserver = ["pymssql"]
|
||||||
|
http = ["flask-session"]
|
||||||
|
all = ["mysql-connector-python","psycopg2-binary","nzpy","pymssql","duckdb-engine","aiosqlite","pymongo","cloudant","pandas-gbq","google-cloud-bigquery","google-cloud-bigquery-storage", "databricks-sqlalchemy","pyncclient","boto3","boto","botocore","pydrill","pyspark","sqlalchemy_drill", "pika","aiosqlite","boto3","boto","botocore", "pyncclient"]
|
||||||
|
|
||||||
|
[project.urls]
|
||||||
|
Homepage = "https://healthcareio.the-phi.com/git/code/transport.git"
|
||||||
|
|
||||||
|
#[project.scripts]
|
||||||
|
#transport = "transport:main"
|
||||||
|
|
||||||
|
[tool.setuptools]
|
||||||
|
include-package-data = true
|
||||||
|
zip-safe = false
|
||||||
|
script-files = ["bin/transport"]
|
||||||
|
|
||||||
|
[tool.setuptools.packages.find]
|
||||||
|
include = ["info","info.*", "transport", "transport.*"]
|
||||||
|
|
||||||
|
[tool.setuptools.dynamic]
|
||||||
|
version = {attr = "info.__version__"}
|
||||||
|
#authors = {attr = "meta.__author__"}
|
||||||
|
|
||||||
|
# If you have a info.py file, you might also want to include the author dynamically:
|
||||||
|
# [tool.setuptools.dynamic]
|
||||||
|
# version = {attr = "info.__version__"}
|
||||||
|
# authors = {attr = "info.__author__"}
|
@ -1,28 +0,0 @@
|
|||||||
"""
|
|
||||||
This is a build file for the
|
|
||||||
"""
|
|
||||||
from setuptools import setup, find_packages
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
# from version import __version__,__author__
|
|
||||||
from info import __version__, __author__,__app_name__,__license__
|
|
||||||
|
|
||||||
|
|
||||||
def read(fname):
|
|
||||||
return open(os.path.join(os.path.dirname(__file__), fname)).read()
|
|
||||||
args = {
|
|
||||||
"name":__app_name__,
|
|
||||||
"version":__version__,
|
|
||||||
"author":__author__,"author_email":"info@the-phi.com",
|
|
||||||
"license":__license__,
|
|
||||||
# "packages":["transport","info","transport/sql"]},
|
|
||||||
|
|
||||||
"packages": find_packages(include=['info','transport', 'transport.*'])}
|
|
||||||
args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite']
|
|
||||||
args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql']
|
|
||||||
args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git"
|
|
||||||
args['scripts'] = ['bin/transport']
|
|
||||||
# if sys.version_info[0] == 2 :
|
|
||||||
# args['use_2to3'] = True
|
|
||||||
# args['use_2to3_exclude_fixers']=['lib2to3.fixes.fix_import']
|
|
||||||
setup(**args)
|
|
@ -0,0 +1,7 @@
|
|||||||
|
"""
|
||||||
|
This namespace/package is intended to handle read/writes against data warehouse solutions like :
|
||||||
|
- apache iceberg
|
||||||
|
- clickhouse (...)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from . import iceberg, drill
|
@ -0,0 +1,55 @@
|
|||||||
|
import sqlalchemy
|
||||||
|
import pandas as pd
|
||||||
|
from .. sql.common import BaseReader , BaseWriter
|
||||||
|
import sqlalchemy as sqa
|
||||||
|
|
||||||
|
class Drill :
|
||||||
|
__template = {'host':None,'port':None,'ssl':None,'table':None,'database':None}
|
||||||
|
def __init__(self,**_args):
|
||||||
|
|
||||||
|
self._host = _args['host'] if 'host' in _args else 'localhost'
|
||||||
|
self._port = _args['port'] if 'port' in _args else self.get_default_port()
|
||||||
|
self._ssl = False if 'ssl' not in _args else _args['ssl']
|
||||||
|
|
||||||
|
self._table = _args['table'] if 'table' in _args else None
|
||||||
|
if self._table and '.' in self._table :
|
||||||
|
_seg = self._table.split('.')
|
||||||
|
if len(_seg) > 2 :
|
||||||
|
self._schema,self._database = _seg[:2]
|
||||||
|
else:
|
||||||
|
|
||||||
|
self._database=_args['database']
|
||||||
|
self._schema = self._database.split('.')[0]
|
||||||
|
|
||||||
|
def _get_uri(self,**_args):
|
||||||
|
return f'drill+sadrill://{self._host}:{self._port}/{self._database}?use_ssl={self._ssl}'
|
||||||
|
def get_provider(self):
|
||||||
|
return "drill+sadrill"
|
||||||
|
def get_default_port(self):
|
||||||
|
return "8047"
|
||||||
|
def meta(self,**_args):
|
||||||
|
_table = _args['table'] if 'table' in _args else self._table
|
||||||
|
if '.' in _table :
|
||||||
|
_schema = _table.split('.')[:2]
|
||||||
|
_schema = '.'.join(_schema)
|
||||||
|
_table = _table.split('.')[-1]
|
||||||
|
else:
|
||||||
|
_schema = self._schema
|
||||||
|
|
||||||
|
# _sql = f"select COLUMN_NAME AS name, CASE WHEN DATA_TYPE ='CHARACTER VARYING' THEN 'CHAR ( 125 )' ELSE DATA_TYPE END AS type from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='{_schema}' and TABLE_NAME='{_table}'"
|
||||||
|
_sql = f"select COLUMN_NAME AS name, CASE WHEN DATA_TYPE ='CHARACTER VARYING' THEN 'CHAR ( '||COLUMN_SIZE||' )' ELSE DATA_TYPE END AS type from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='{_schema}' and TABLE_NAME='{_table}'"
|
||||||
|
try:
|
||||||
|
_df = pd.read_sql(_sql,self._engine)
|
||||||
|
return _df.to_dict(orient='records')
|
||||||
|
except Exception as e:
|
||||||
|
print (e)
|
||||||
|
pass
|
||||||
|
return []
|
||||||
|
class Reader (Drill,BaseReader) :
|
||||||
|
def __init__(self,**_args):
|
||||||
|
super().__init__(**_args)
|
||||||
|
self._chunksize = 0 if 'chunksize' not in _args else _args['chunksize']
|
||||||
|
self._engine= sqa.create_engine(self._get_uri(),future=True)
|
||||||
|
class Writer(Drill,BaseWriter):
|
||||||
|
def __init__(self,**_args):
|
||||||
|
super().__init__(self,**_args)
|
@ -0,0 +1,151 @@
|
|||||||
|
"""
|
||||||
|
dependency:
|
||||||
|
- spark and SPARK_HOME environment variable must be set
|
||||||
|
NOTE:
|
||||||
|
When using streaming option, insure that it is inline with default (1000 rows) or increase it in spark-defaults.conf
|
||||||
|
|
||||||
|
"""
|
||||||
|
from pyspark.sql import SparkSession
|
||||||
|
from pyspark import SparkContext
|
||||||
|
from pyspark.sql.types import *
|
||||||
|
from pyspark.sql.functions import col, to_date, to_timestamp
|
||||||
|
import copy
|
||||||
|
|
||||||
|
class Iceberg :
|
||||||
|
def __init__(self,**_args):
|
||||||
|
"""
|
||||||
|
providing catalog meta information (you must get this from apache iceberg)
|
||||||
|
"""
|
||||||
|
#
|
||||||
|
# Turning off logging (it's annoying & un-professional)
|
||||||
|
#
|
||||||
|
# _spconf = SparkContext()
|
||||||
|
# _spconf.setLogLevel("ERROR")
|
||||||
|
#
|
||||||
|
# @TODO:
|
||||||
|
# Make arrangements for additional configuration elements
|
||||||
|
#
|
||||||
|
self._session = SparkSession.builder.appName("data-transport").getOrCreate()
|
||||||
|
self._session.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
|
||||||
|
# self._session.sparkContext.setLogLevel("ERROR")
|
||||||
|
self._catalog = self._session.catalog
|
||||||
|
self._table = _args['table'] if 'table' in _args else None
|
||||||
|
|
||||||
|
if 'catalog' in _args :
|
||||||
|
#
|
||||||
|
# Let us set the default catalog
|
||||||
|
self._catalog.setCurrentCatalog(_args['catalog'])
|
||||||
|
|
||||||
|
else:
|
||||||
|
# No current catalog has been set ...
|
||||||
|
pass
|
||||||
|
if 'database' in _args :
|
||||||
|
self._database = _args['database']
|
||||||
|
self._catalog.setCurrentDatabase(self._database)
|
||||||
|
else:
|
||||||
|
#
|
||||||
|
# Should we set the default as the first one if available ?
|
||||||
|
#
|
||||||
|
pass
|
||||||
|
self._catalogName = self._catalog.currentCatalog()
|
||||||
|
self._databaseName = self._catalog.currentDatabase()
|
||||||
|
def meta (self,**_args) :
|
||||||
|
"""
|
||||||
|
This function should return the schema of a table (only)
|
||||||
|
"""
|
||||||
|
_schema = []
|
||||||
|
try:
|
||||||
|
_table = _args['table'] if 'table' in _args else self._table
|
||||||
|
_tableName = self._getPrefix(**_args) + f".{_table}"
|
||||||
|
_tmp = self._session.table(_tableName).schema
|
||||||
|
_schema = _tmp.jsonValue()['fields']
|
||||||
|
for _item in _schema :
|
||||||
|
del _item['nullable'],_item['metadata']
|
||||||
|
except Exception as e:
|
||||||
|
|
||||||
|
pass
|
||||||
|
return _schema
|
||||||
|
def _getPrefix (self,**_args):
|
||||||
|
_catName = self._catalogName if 'catalog' not in _args else _args['catalog']
|
||||||
|
_datName = self._databaseName if 'database' not in _args else _args['database']
|
||||||
|
|
||||||
|
return '.'.join([_catName,_datName])
|
||||||
|
def apply(self,_query):
|
||||||
|
"""
|
||||||
|
sql query/command to run against apache iceberg
|
||||||
|
"""
|
||||||
|
return self._session.sql(_query).toPandas()
|
||||||
|
def has (self,**_args):
|
||||||
|
try:
|
||||||
|
_prefix = self._getPrefix(**_args)
|
||||||
|
if _prefix.endswith('.') :
|
||||||
|
return False
|
||||||
|
return _args['table'] in [_item.name for _item in self._catalog.listTables(_prefix)]
|
||||||
|
except Exception as e:
|
||||||
|
print (e)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self._session.stop()
|
||||||
|
class Reader(Iceberg) :
|
||||||
|
def __init__(self,**_args):
|
||||||
|
super().__init__(**_args)
|
||||||
|
def read(self,**_args):
|
||||||
|
_table = self._table
|
||||||
|
_prefix = self._getPrefix(**_args)
|
||||||
|
if 'table' in _args or _table:
|
||||||
|
_table = _args['table'] if 'table' in _args else _table
|
||||||
|
_table = _prefix + f'.{_table}'
|
||||||
|
return self._session.table(_table).toPandas()
|
||||||
|
else:
|
||||||
|
sql = _args['sql']
|
||||||
|
return self._session.sql(sql).toPandas()
|
||||||
|
pass
|
||||||
|
class Writer (Iceberg):
|
||||||
|
"""
|
||||||
|
Writing data to an Apache Iceberg data warehouse (using pyspark)
|
||||||
|
"""
|
||||||
|
def __init__(self,**_args):
|
||||||
|
super().__init__(**_args)
|
||||||
|
self._mode = 'append' if 'mode' not in _args else _args['mode']
|
||||||
|
self._table = None if 'table' not in _args else _args['table']
|
||||||
|
def format (self,_schema) :
|
||||||
|
_iceSchema = StructType([])
|
||||||
|
_map = {'integer':IntegerType(),'float':DoubleType(),'double':DoubleType(),'date':DateType(),
|
||||||
|
'timestamp':TimestampType(),'datetime':TimestampType(),'string':StringType(),'varchar':StringType()}
|
||||||
|
for _item in _schema :
|
||||||
|
_name = _item['name']
|
||||||
|
_type = _item['type'].lower()
|
||||||
|
if _type not in _map :
|
||||||
|
_iceType = StringType()
|
||||||
|
else:
|
||||||
|
_iceType = _map[_type]
|
||||||
|
|
||||||
|
_iceSchema.add (StructField(_name,_iceType,True))
|
||||||
|
return _iceSchema if len(_iceSchema) else []
|
||||||
|
def write(self,_data,**_args):
|
||||||
|
_prefix = self._getPrefix(**_args)
|
||||||
|
if 'table' not in _args and not self._table :
|
||||||
|
raise Exception (f"Table Name should be specified for catalog/database {_prefix}")
|
||||||
|
_schema = self.format(_args['schema']) if 'schema' in _args else []
|
||||||
|
if not _schema :
|
||||||
|
rdd = self._session.createDataFrame(_data,verifySchema=False)
|
||||||
|
else :
|
||||||
|
rdd = self._session.createDataFrame(_data,schema=_schema,verifySchema=True)
|
||||||
|
_mode = self._mode if 'mode' not in _args else _args['mode']
|
||||||
|
_table = self._table if 'table' not in _args else _args['table']
|
||||||
|
|
||||||
|
# print (_data.shape,_mode,_table)
|
||||||
|
|
||||||
|
if not self._session.catalog.tableExists(_table):
|
||||||
|
# # @TODO:
|
||||||
|
# # add partitioning information here
|
||||||
|
rdd.writeTo(_table).using('iceberg').create()
|
||||||
|
|
||||||
|
# # _mode = 'overwrite'
|
||||||
|
# # rdd.write.format('iceberg').mode(_mode).saveAsTable(_table)
|
||||||
|
else:
|
||||||
|
# rdd.writeTo(_table).append()
|
||||||
|
# # _table = f'{_prefix}.{_table}'
|
||||||
|
|
||||||
|
rdd.coalesce(10).write.format('iceberg').mode('append').save(_table)
|
Loading…
Reference in new issue