Compare commits

...

38 Commits
v2.4 ... master

Author SHA1 Message Date
Steve L. Nyemba b461ce9d7b Merge pull request 'v2.2.0' (#33) from v2.2.0 into master
3 days ago
Steve Nyemba fbdb4a4931 bug fix: registry and emails
3 days ago
Steve Nyemba 6e1c420952 project file specification
7 days ago
Steve Nyemba 66d881fdda upgrade pyproject.toml, bug fix with registry
7 days ago
Steve L. Nyemba 6c26588462 Merge pull request 'v2.2.0' (#32) from v2.2.0 into master
2 weeks ago
Steve Nyemba de4e065ca6 bug fix with newer setuptools
2 weeks ago
Steve Nyemba e035f5eba0 windows bug fix, environment variable
2 weeks ago
Steve Nyemba 6f8019f582 bug fix
3 weeks ago
Steve L. Nyemba d3517a5720 Merge pull request 'bug fix: logger issue' (#31) from v2.2.0 into master
4 weeks ago
Steve Nyemba b0cd0b85dc bug fix: logger issue
2 months ago
Steve L. Nyemba 4c98e81c14 Merge pull request 'v2.2.0: bug fixes' (#30) from v2.2.0 into master
3 months ago
Steve Nyemba 4b34c746ae bug fix: missing table
3 months ago
Steve Nyemba 0977ad1b18 setup fixes
4 months ago
Steve Nyemba 98ef8a848e bug fixes and dependencies
4 months ago
Steve Nyemba 469c6f89a2 fixes with plugin handler
4 months ago
Steve Nyemba dd10f6db78 bug fix: version & cli
4 months ago
Steve Nyemba dad2956a8c version update
4 months ago
Steve Nyemba eaa2b99a2d bug fix: schema (postgresql) construct
4 months ago
Steve Nyemba a1b5f2743c bug fixes ...
5 months ago
Steve Nyemba afa442ea8d versioning update edition
5 months ago
Steve Nyemba 30645e46bd bug fix: readonly for duckdb
5 months ago
Steve Nyemba cdf783143e ...
5 months ago
Steve Nyemba 1a8112f152 adding iceberg notebook
5 months ago
Steve Nyemba 49ebd4a432 bug fix: close & etl
5 months ago
Steve Nyemba c3627586b3 fix: refactor cli switches
6 months ago
Steve Nyemba 2a72de4cd6 bug fixes: registry and handling cli parameters as well as adding warehousing
6 months ago
Steve Nyemba d0e655e7e3 update, community edition baseline
8 months ago
Steve L. Nyemba 492dc8f374 Merge pull request 'new provider console and bug fixes with applied commands' (#25) from v2.2.0 into master
10 months ago
Steve L. Nyemba e848367378 Merge pull request 'bug fix, duckdb in-memory handling' (#24) from v2.2.0 into master
10 months ago
Steve L. Nyemba c872ba8cc2 Merge pull request 'v2.2.0 - Bug fixes with mongodb, console' (#23) from v2.2.0 into master
10 months ago
Steve L. Nyemba baa8164f16 Merge pull request 'aws s3 notebook, brief example' (#22) from v2.2.0 into master
1 year ago
Steve L. Nyemba 31556ebd32 Merge pull request 'v2.2.0 bug fix - AWS-S3' (#21) from v2.2.0 into master
1 year ago
Steve L. Nyemba 1e7839198a Merge pull request 'v2.2.0 - shared environment support and duckdb support' (#20) from v2.2.0 into master
1 year ago
Steve L. Nyemba dce50a967e Merge pull request 'documentation ...' (#19) from v2.0.4 into master
1 year ago
Steve L. Nyemba 5ccb073865 Merge pull request 'refactor: etl,better reusability & streamlined and threaded' (#18) from v2.0.4 into master
1 year ago
Steve L. Nyemba 3081fb98e7 Merge pull request 'version 2.0 - Refactored, Plugins support' (#17) from v2.0 into master
1 year ago
Steve L. Nyemba 58959359ad Merge pull request 'bug fix: psycopg2 with numpy' (#14) from dev into master
1 year ago
Steve L. Nyemba 68b8f6af5f Merge pull request 'fixes 2024 pandas-gbq and sqlalchemy' (#10) from dev into master
1 year ago

@ -24,19 +24,28 @@ from multiprocessing import Process
import os
import transport
from transport import etl
# from transport import etl
from transport.iowrapper import IETL
# from transport import providers
import typer
from typing_extensions import Annotated
from typing import Optional
import time
from termcolor import colored
from enum import Enum
from rich import print
import plugin_ix as pix
app = typer.Typer()
app_e = typer.Typer() #-- handles etl (run, generate)
app_x = typer.Typer() #-- handles plugins (list,add, test)
app_i = typer.Typer() #-- handles information (version, license)
app_r = typer.Typer() #-- handles registry
REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
REGISTRY_FILE= 'transport-registry.json'
CHECK_MARK = ' '.join(['[',colored(u'\u2713', 'green'),']'])
TIMES_MARK= ' '.join(['[',colored(u'\u2717','red'),']'])
CHECK_MARK = '[ [green]\u2713[/green] ]' #' '.join(['[',colored(u'\u2713', 'green'),']'])
TIMES_MARK= '[ [red]\u2717[/red] ]' #' '.join(['[',colored(u'\u2717','red'),']'])
# @app.command()
def help() :
print (__doc__)
@ -44,10 +53,15 @@ def wait(jobs):
while jobs :
jobs = [thread for thread in jobs if thread.is_alive()]
time.sleep(1)
def wait (jobs):
while jobs :
jobs = [pthread for pthread in jobs if pthread.is_alive()]
@app.command(name="apply")
@app_e.command(name="run")
def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed")):
index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"),
batch:int = typer.Option(default=5, help="The number of parallel processes to run at once")
):
"""
This function applies data transport ETL feature to read data from one source to write it one or several others
"""
@ -56,23 +70,34 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil
file = open(path)
_config = json.loads (file.read() )
file.close()
if index :
if index is not None:
_config = [_config[ int(index)]]
jobs = []
for _args in _config :
pthread = etl.instance(**_args) #-- automatically starts the process
# pthread = etl.instance(**_args) #-- automatically starts the process
def bootup ():
_worker = IETL(**_args)
_worker.run()
pthread = Process(target=bootup)
pthread.start()
jobs.append(pthread)
if len(jobs) == batch :
wait(jobs)
jobs = []
if jobs :
wait (jobs)
#
# @TODO: Log the number of processes started and estimated time
while jobs :
jobs = [pthread for pthread in jobs if pthread.is_alive()]
time.sleep(1)
# @TODO: Log the number of processes started and estfrom transport impfrom transport impimated time
# while jobs :
# jobs = [pthread for pthread in jobs if pthread.is_alive()]
# time.sleep(1)
#
# @TODO: Log the job termination here ...
@app.command(name="providers")
@app_i.command(name="supported")
def supported (format:Annotated[str,typer.Argument(help="format of the output, supported formats are (list,table,json)")]="table") :
"""
This function will print supported providers/vendors and their associated classifications
This function will print supported database technologies
"""
_df = (transport.supported())
if format in ['list','json'] :
@ -80,17 +105,26 @@ def supported (format:Annotated[str,typer.Argument(help="format of the output, s
else:
print (_df)
print ()
@app.command()
@app_i.command(name="version")
def version ():
"""
This function will display version and license information
This function will return the version of the data-transport
"""
print()
print (f'[bold] {transport.__app_name__} ,[blue] {transport.__edition__} edition [/blue], version {transport.__version__}[/bold]')
print ()
print (transport.__app_name__,'version ',transport.__version__)
@app_i.command(name="license")
def info():
"""
This function will display version and license information
"""
print()
print (f'[bold] {transport.__app_name__} ,{transport.__edition__}, version {transport.__version__}[/bold]')
print ()
print (transport.__license__)
@app.command()
@app_e.command()
def generate (path:Annotated[str,typer.Argument(help="path of the ETL configuration file template (name included)")]):
"""
This function will generate a configuration template to give a sense of how to create one
@ -99,45 +133,45 @@ def generate (path:Annotated[str,typer.Argument(help="path of the ETL configurat
{
"source":{"provider":"http","url":"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv"},
"target":
[{"provider":"files","path":"addresses.csv","delimiter":","},{"provider":"sqlite","database":"sample.db3","table":"addresses"}]
[{"provider":"files","path":"addresses.csv","delimiter":","},{"provider":"sqlite3","database":"sample.db3","table":"addresses"}]
}
]
file = open(path,'w')
file.write(json.dumps(_config))
file.close()
print (f"""{CHECK_MARK} Successfully generated a template ETL file at {path}""" )
print (f"""{CHECK_MARK} Successfully generated a template ETL file at [bold]{path}[/bold]""" )
print ("""NOTE: Each line (source or target) is the content of an auth-file""")
@app.command(name="init")
@app_r.command(name="reset")
def initregistry (email:Annotated[str,typer.Argument(help="email")],
path:str=typer.Option(default=REGISTRY_PATH,help="path or location of the configuration file"),
override:bool=typer.Option(default=False,help="override existing configuration or not")):
"""
This functiion will initialize the registry and have both application and calling code loading the database parameters by a label
This functiion will initialize the data-transport registry and have both application and calling code loading the database parameters by a label
"""
try:
transport.registry.init(email=email, path=path, override=override)
_msg = f"""{CHECK_MARK} Successfully wrote configuration to {path} from {email}"""
_msg = f"""{CHECK_MARK} Successfully wrote configuration to [bold]{path}[/bold] from [bold]{email}[/bold]"""
except Exception as e:
_msg = f"{TIMES_MARK} {e}"
print (_msg)
print ()
@app.command(name="register")
@app_r.command(name="add")
def register (label:Annotated[str,typer.Argument(help="unique label that will be used to load the parameters of the database")],
auth_file:Annotated[str,typer.Argument(help="path of the auth_file")],
default:bool=typer.Option(default=False,help="set the auth_file as default"),
path:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")):
"""
This function will register an auth-file i.e database connection and assign it a label,
Learn more about auth-file at https://healthcareio.the-phi.com/data-transport
This function add a database label for a given auth-file. which allows access to the database using a label of your choice.
"""
try:
if transport.registry.exists(path) :
transport.registry.set(label=label,auth_file=auth_file, default=default, path=path)
_msg = f"""{CHECK_MARK} Successfully added label "{label}" to data-transport registry"""
_msg = f"""{CHECK_MARK} Successfully added label [bold]"{label}"[/bold] to data-transport registry"""
else:
_msg = f"""{TIMES_MARK} Registry is not initialized, please initialize the registry (check help)"""
except Exception as e:
@ -145,6 +179,68 @@ def register (label:Annotated[str,typer.Argument(help="unique label that will be
print (_msg)
pass
@app_x.command(name='add')
def register_plugs (
alias:Annotated[str,typer.Argument(help="unique function name within a file")],
path:Annotated[str,typer.Argument(help="path of the python file, that contains functions")],
folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry folder"),
):
"""
This function will register a file and the functions within we are interested in using
"""
if ',' in alias :
alias = [_name.strip() for _name in alias.split(',') if _name.strip() != '' ]
else:
alias = [alias.strip()]
_pregistry = pix.Registry(folder=folder,plugin_folder='plugins/code')
_log = _pregistry.set(path,alias)
# transport.registry.plugins.init()
# _log = transport.registry.plugins.add(alias,path)
_mark = TIMES_MARK if not _log else CHECK_MARK
_msg = f"""Could NOT add the [bold]{alias}[/bold]to the registry""" if not _log else f""" successfully added {alias}, {_log} functions registered"""
print (f"""{_mark} {_msg}""")
@app_x.command(name="list")
def registry_list (folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport configuration folder")):
"""
This function will list all the plugins (python functions/files) that are registered and can be reused
"""
_pregistry = pix.Registry(folder=folder)
_df = _pregistry.stats()
if _df.empty :
print (f"{TIMES_MARK} registry at {folder} is not ready")
else:
print (_df)
@app_x.command ("has")
def registry_has (alias:Annotated[str,typer.Argument(help="alias of a function function@file or file.function")],
folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")) :
_pregistry = pix.Registry(folder=folder)
if _pregistry.has(alias) :
_msg = f"{CHECK_MARK} {alias} was [bold] found [/bold] in registry "
else:
_msg = f"{TIMES_MARK} {alias} was [bold] NOT found [/bold] in registry "
print (_msg)
@app_x.command(name="test")
def registry_test (alias:Annotated[str,typer.Argument(help="alias of a function function@file or file.function")],
folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry folder")) :
_pregistry = pix.Registry(folder=folder)
"""
This function allows to test syntax for a plugin i.e in terms of alias@function
"""
# _item = transport.registry.plugins.has(key=key)
_pointer = _pregistry.get(alias) if _pregistry.has(alias) else None
if _pointer:
print (f"""{CHECK_MARK} successfully loaded [bold] {alias}[/bold] found in {folder}""")
else:
print (f"{TIMES_MARK} unable to load {alias}. Make sure it is registered")
app.add_typer(app_e,name='etl',help="This function will run etl or generate a template etl configuration file")
app.add_typer(app_r,name='registry',help='This function allows labeling database access information')
app.add_typer(app_i,name="info",help="This function will print either license or supported database technologies")
app.add_typer(app_x, name="plugins",help="This function enables add/list/test of plugins in the registry")
if __name__ == '__main__' :
app()

@ -1,7 +1,8 @@
__app_name__ = 'data-transport'
__author__ = 'The Phi Technology'
__version__= '2.2.6'
__version__= '2.2.22'
__email__ = "info@the-phi.com"
__edition__= 'community'
__license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba
@ -13,9 +14,10 @@ THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR I
"""
__whatsnew__=f"""version {__version__}, focuses on collaborative environments like jupyter-base servers (apache zeppelin; jupyter notebook, jupyterlab, jupyterhub)
__whatsnew__=f"""version {__version__},
1. Added support for read/write logs as well as plugins (when applied)
2. Bug fix with duckdb (adding readonly) for readers because there are issues with threads & processes
3. support for streaming data, important to use this with large volumes of data
1. simpler syntax to create readers/writers
2. auth-file registry that can be referenced using a label
3. duckdb support
"""

@ -0,0 +1,138 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Writing to Apache Iceberg\n",
"\n",
"1. Insure you have a Google Bigquery service account key on disk\n",
"2. The service key location is set as an environment variable **BQ_KEY**\n",
"3. The dataset will be automatically created within the project associated with the service key\n",
"\n",
"The cell below creates a dataframe that will be stored within Google Bigquery"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['data transport version ', '2.4.0']\n"
]
}
],
"source": [
"#\n",
"# Writing to Google Bigquery database\n",
"#\n",
"import transport\n",
"from transport import providers\n",
"import pandas as pd\n",
"import os\n",
"\n",
"PRIVATE_KEY = os.environ['BQ_KEY'] #-- location of the service key\n",
"DATASET = 'demo'\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"# bqw = transport.get.writer(provider=providers.ICEBERG,catalog='mz',database='edw.mz',table='friends')\n",
"bqw = transport.get.writer(provider=providers.ICEBERG,table='edw.mz.friends')\n",
"bqw.write(_data,if_exists='replace') #-- default is append\n",
"print (['data transport version ', transport.__version__])\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Reading from Google Bigquery\n",
"\n",
"The cell below reads the data that has been written by the cell above and computes the average age within a Google Bigquery (simple query). \n",
"\n",
"- Basic read of the designated table (friends) created above\n",
"- Execute an aggregate SQL against the table\n",
"\n",
"**NOTE**\n",
"\n",
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" name age\n",
"0 James Bond 55\n",
"1 Steve Rogers 150\n",
"2 Steve Nyemba 44\n",
"--------- STATISTICS ------------\n"
]
}
],
"source": [
"\n",
"import transport\n",
"from transport import providers\n",
"import os\n",
"PRIVATE_KEY=os.environ['BQ_KEY']\n",
"pgr = transport.get.reader(provider=providers.ICEBERG,database='edw.mz')\n",
"_df = pgr.read(table='friends')\n",
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
"_sdf = pgr.read(sql=_query)\n",
"print (_df)\n",
"print ('--------- STATISTICS ------------')\n",
"# print (_sdf)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"\n",
"1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"\n",
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

@ -0,0 +1,62 @@
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "data-transport"
dynamic = ["version"]
authors = [
{name="Steve L. Nyemba" , email = "info@the-phi.com"},
]
description = ""
readme = "README.md"
license = {text = "LICENSE"}
keywords = ["mongodb","duckdb","couchdb","rabbitmq","file","read","write","s3","sqlite"]
classifiers = [
"License :: OSI Approved :: MIT License",
"Topic :: Utilities",
]
dependencies = [
"termcolor","sqlalchemy", "aiosqlite","duckdb-engine",
"typer","pandas","numpy","sqlalchemy","pyarrow",
"plugin-ix@git+https://github.com/lnyemba/plugins-ix"
]
[project.optional-dependencies]
sql = ["mysql-connector-python","psycopg2-binary","nzpy","pymssql","duckdb-engine","aiosqlite"]
nosql = ["pymongo","cloudant"]
cloud = ["pandas-gbq","google-cloud-bigquery","google-cloud-bigquery-storage", "databricks-sqlalchemy","pyncclient","boto3","boto","botocore"]
warehouse = ["pydrill","pyspark","sqlalchemy_drill"]
rabbitmq = ["pika"]
sqlite = ["aiosqlite"]
aws3 = ["boto3","boto","botocore"]
nextcloud = ["pyncclient"]
mongodb = ["pymongo"]
netezza = ["nzpy"]
mysql = ["mysql-connector-python"]
postgresql = ["psycopg2-binary"]
sqlserver = ["pymssql"]
http = ["flask-session"]
all = ["mysql-connector-python","psycopg2-binary","nzpy","pymssql","duckdb-engine","aiosqlite","pymongo","cloudant","pandas-gbq","google-cloud-bigquery","google-cloud-bigquery-storage", "databricks-sqlalchemy","pyncclient","boto3","boto","botocore","pydrill","pyspark","sqlalchemy_drill", "pika","aiosqlite","boto3","boto","botocore", "pyncclient"]
[project.urls]
Homepage = "https://healthcareio.the-phi.com/git/code/transport.git"
#[project.scripts]
#transport = "transport:main"
[tool.setuptools]
include-package-data = true
zip-safe = false
script-files = ["bin/transport"]
[tool.setuptools.packages.find]
include = ["info","info.*", "transport", "transport.*"]
[tool.setuptools.dynamic]
version = {attr = "info.__version__"}
#authors = {attr = "meta.__author__"}
# If you have a info.py file, you might also want to include the author dynamically:
# [tool.setuptools.dynamic]
# version = {attr = "info.__version__"}
# authors = {attr = "info.__author__"}

@ -1,28 +0,0 @@
"""
This is a build file for the
"""
from setuptools import setup, find_packages
import os
import sys
# from version import __version__,__author__
from info import __version__, __author__,__app_name__,__license__
def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = {
"name":__app_name__,
"version":__version__,
"author":__author__,"author_email":"info@the-phi.com",
"license":__license__,
# "packages":["transport","info","transport/sql"]},
"packages": find_packages(include=['info','transport', 'transport.*'])}
args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite']
args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql']
args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git"
args['scripts'] = ['bin/transport']
# if sys.version_info[0] == 2 :
# args['use_2to3'] = True
# args['use_2to3_exclude_fixers']=['lib2to3.fixes.fix_import']
setup(**args)

@ -18,31 +18,36 @@ Source Code is available under MIT License:
"""
import numpy as np
from transport import sql, nosql, cloud, other
from transport import sql, nosql, cloud, other, warehouse
import pandas as pd
import json
import os
from info import __version__,__author__,__email__,__license__,__app_name__,__whatsnew__
from info import __version__,__author__,__email__,__license__,__app_name__,__whatsnew__,__edition__
from transport.iowrapper import IWriter, IReader, IETL
from transport.plugins import PluginLoader
from transport import providers
import copy
from transport import registry
from transport.plugins import Plugin
PROVIDERS = {}
def init():
global PROVIDERS
for _module in [cloud,sql,nosql,other] :
for _module in [cloud,sql,nosql,other,warehouse] :
for _provider_name in dir(_module) :
if _provider_name.startswith('__') or _provider_name == 'common':
continue
PROVIDERS[_provider_name] = {'module':getattr(_module,_provider_name),'type':_module.__name__}
def _getauthfile (path) :
f = open(path)
_object = json.loads(f.read())
f.close()
return _object
#
# loading the registry
if not registry.isloaded() :
registry.load()
# def _getauthfile (path) :
# f = open(path)
# _object = json.loads(f.read())
# f.close()
# return _object
def instance (**_args):
"""
This function returns an object of to read or write from a supported database provider/vendor
@ -52,15 +57,6 @@ def instance (**_args):
kwargs These are arguments that are provider/vendor specific
"""
global PROVIDERS
# if not registry.isloaded () :
# if ('path' in _args and registry.exists(_args['path'] )) or registry.exists():
# registry.load() if 'path' not in _args else registry.load(_args['path'])
# print ([' GOT IT'])
# if 'label' in _args and registry.isloaded():
# _info = registry.get(_args['label'])
# if _info :
# #
# _args = dict(_args,**_info)
if 'auth_file' in _args:
if os.path.exists(_args['auth_file']) :
@ -87,8 +83,6 @@ def instance (**_args):
else:
_info = registry.get()
if _info :
#
# _args = dict(_args,**_info)
_args = dict(_info,**_args) #-- we can override the registry parameters with our own arguments
if 'provider' in _args and _args['provider'] in PROVIDERS :
@ -119,8 +113,32 @@ def instance (**_args):
# for _delegate in _params :
# loader.set(_delegate)
loader = None if 'plugins' not in _args else _args['plugins']
return IReader(_agent,loader) if _context == 'read' else IWriter(_agent,loader)
_plugins = None if 'plugins' not in _args else _args['plugins']
# if registry.has('logger') :
# _kwa = registry.get('logger')
# _lmodule = getPROVIDERS[_kwa['provider']]
if ( ('label' in _args and _args['label'] != 'logger') and registry.has('logger')):
#
# We did not request label called logger, so we are setting up a logger if it is specified in the registry
#
_kwargs = registry.get('logger')
_kwargs['context'] = 'write'
_kwargs['table'] =_module.__name__.split('.')[-1]+'_logs'
# _logger = instance(**_kwargs)
_module = PROVIDERS[_kwargs['provider']]['module']
_logger = getattr(_module,'Writer')
_logger = _logger(**_kwargs)
else:
_logger = None
_kwargs = {'agent':_agent,'plugins':_plugins,'logger':_logger}
if 'args' in _args :
_kwargs['args'] = _args['args']
# _datatransport = IReader(_agent,_plugins,_logger) if _context == 'read' else IWriter(_agent,_plugins,_logger)
_datatransport = IReader(**_kwargs) if _context == 'read' else IWriter(**_kwargs)
return _datatransport
else:
#
@ -137,7 +155,14 @@ class get :
if not _args or ('provider' not in _args and 'label' not in _args):
_args['label'] = 'default'
_args['context'] = 'read'
return instance(**_args)
# return instance(**_args)
# _args['logger'] = instance(**{'label':'logger','context':'write','table':'logs'})
_handler = instance(**_args)
# _handler.setLogger(get.logger())
return _handler
@staticmethod
def writer(**_args):
"""
@ -146,10 +171,26 @@ class get :
if not _args or ('provider' not in _args and 'label' not in _args):
_args['label'] = 'default'
_args['context'] = 'write'
# _args['logger'] = instance(**{'label':'logger','context':'write','table':'logs'})
_handler = instance(**_args)
#
# Implementing logging with the 'eat-your-own-dog-food' approach
# Using dependency injection to set the logger (problem with imports)
#
# _handler.setLogger(get.logger())
return _handler
@staticmethod
def logger ():
if registry.has('logger') :
_args = registry.get('logger')
_args['context'] = 'write'
return instance(**_args)
return None
@staticmethod
def etl (**_args):
if 'source' in _args and 'target' in _args :
return IETL(**_args)
else:
raise Exception ("Malformed input found, object must have both 'source' and 'target' attributes")

@ -5,35 +5,28 @@ NOTE: Plugins are converted to a pipeline, so we apply a pipeline when reading o
- upon initialization we will load plugins
- on read/write we apply a pipeline (if passed as an argument)
"""
from transport.plugins import plugin, PluginLoader
from transport.plugins import Plugin, PluginLoader
import transport
from transport import providers
from multiprocessing import Process
import time
import plugin_ix
class IO:
"""
Base wrapper class for read/write and support for logs
"""
def __init__(self,_agent,plugins):
def __init__(self,**_args):
_agent = _args['agent']
plugins = _args['plugins'] if 'plugins' in _args else None
self._agent = _agent
# self._ixloader = plugin_ix.Loader () #-- must indicate where the plugin registry file is
self._ixloader = plugin_ix.Loader (registry=plugin_ix.Registry(folder=transport.registry.REGISTRY_PATH))
if plugins :
self._init_plugins(plugins)
else:
self._plugins = None
self.init_plugins(plugins)
def _init_plugins(self,_args):
"""
This function will load pipelined functions as a plugin loader
"""
if 'path' in _args and 'names' in _args :
self._plugins = PluginLoader(**_args)
else:
self._plugins = PluginLoader()
[self._plugins.set(_pointer) for _pointer in _args]
#
# @TODO: We should have a way to log what plugins are loaded and ready to use
def meta (self,**_args):
if hasattr(self._agent,'meta') :
return self._agent.meta(**_args)
@ -42,12 +35,12 @@ class IO:
def close(self):
if hasattr(self._agent,'close') :
self._agent.close()
def apply(self):
"""
applying pre/post conditions given a pipeline expression
"""
for _pointer in self._plugins :
_data = _pointer(_data)
# def apply(self):
# """
# applying pre/post conditions given a pipeline expression
# """
# for _pointer in self._plugins :
# _data = _pointer(_data)
def apply(self,_query):
if hasattr(self._agent,'apply') :
return self._agent.apply(_query)
@ -59,30 +52,41 @@ class IO:
pointer = getattr(self._agent,_name)
return pointer(_query)
return None
def init_plugins(self,plugins):
for _ref in plugins :
self._ixloader.set(_ref)
class IReader(IO):
"""
This is a wrapper for read functionalities
"""
def __init__(self,_agent,pipeline=None):
super().__init__(_agent,pipeline)
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args):
if 'plugins' in _args :
self._init_plugins(_args['plugins'])
self.init_plugins(_args['plugins'])
_data = self._agent.read(**_args)
if self._plugins and self._plugins.ratio() > 0 :
_data = self._plugins.apply(_data)
# if self._plugins and self._plugins.ratio() > 0 :
# _data = self._plugins.apply(_data)
#
# output data
#
# applying the the design pattern
_data = self._ixloader.visitor(_data)
return _data
class IWriter(IO):
def __init__(self,_agent,pipeline=None):
super().__init__(_agent,pipeline)
def __init__(self,**_args): #_agent,pipeline=None):
super().__init__(**_args) #_agent,pipeline)
def write(self,_data,**_args):
# if 'plugins' in _args :
# self._init_plugins(_args['plugins'])
if 'plugins' in _args :
self._init_plugins(_args['plugins'])
if self._plugins and self._plugins.ratio() > 0 :
_data = self._plugins.apply(_data)
self.init_plugins(_args['plugins'])
self._ixloader.visitor(_data)
self._agent.write(_data,**_args)
#
@ -94,7 +98,7 @@ class IETL(IReader) :
This class performs an ETL operation by ineriting a read and adding writes as pipeline functions
"""
def __init__(self,**_args):
super().__init__(transport.get.reader(**_args['source']))
super().__init__(agent=transport.get.reader(**_args['source']),plugins=None)
if 'target' in _args:
self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
else:
@ -105,16 +109,23 @@ class IETL(IReader) :
self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
def read(self,**_args):
_data = super().read(**_args)
_schema = super().meta()
for _kwargs in self._targets :
if _schema :
_kwargs['schema'] = _schema
self.post(_data,**_kwargs)
return _data
def run(self) :
return self.read()
def post (self,_data,**_args) :
"""
This function returns an instance of a process that will perform the write operation
:_args parameters associated with writer object
"""
writer = transport.get.writer(**_args)
if 'schema' in _args :
writer.write(_data,schema=_args['schema'])
else:
writer.write(_data)
writer.close()

@ -11,8 +11,10 @@ import importlib as IL
import importlib.util
import sys
import os
import pandas as pd
import time
class plugin :
class Plugin :
"""
Implementing function decorator for data-transport plugins (post-pre)-processing
"""
@ -22,8 +24,9 @@ class plugin :
:mode restrict to reader/writer
:about tell what the function is about
"""
self._name = _args['name']
self._about = _args['about']
self._name = _args['name'] if 'name' in _args else None
self._version = _args['version'] if 'version' in _args else '0.1'
self._doc = _args['doc'] if 'doc' in _args else "N/A"
self._mode = _args['mode'] if 'mode' in _args else 'rw'
def __call__(self,pointer,**kwargs):
def wrapper(_args,**kwargs):
@ -32,57 +35,67 @@ class plugin :
# @TODO:
# add attributes to the wrapper object
#
self._name = pointer.__name__ if not self._name else self._name
setattr(wrapper,'transport',True)
setattr(wrapper,'name',self._name)
setattr(wrapper,'mode',self._mode)
setattr(wrapper,'about',self._about)
setattr(wrapper,'version',self._version)
setattr(wrapper,'doc',self._doc)
return wrapper
class PluginLoader :
"""
This class is intended to load a plugin and make it available and assess the quality of the developed plugin
"""
def __init__(self,**_args):
"""
:path location of the plugin (should be a single file)
:_names of functions to load
"""
_names = _args['names'] if 'names' in _args else None
path = _args['path'] if 'path' in _args else None
self._names = _names if type(_names) == list else [_names]
# _names = _args['names'] if 'names' in _args else None
# path = _args['path'] if 'path' in _args else None
# self._names = _names if type(_names) == list else [_names]
self._modules = {}
self._names = []
if path and os.path.exists(path) and _names:
for _name in self._names :
self._registry = _args['registry']
spec = importlib.util.spec_from_file_location('private', path)
pass
def load (self,**_args):
"""
This function loads a plugin
"""
self._modules = {}
self._names = []
path = _args ['path']
if os.path.exists(path) :
_alias = path.split(os.sep)[-1]
spec = importlib.util.spec_from_file_location(_alias, path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module) #--loads it into sys.modules
if hasattr(module,_name) :
for _name in dir(module) :
if self.isplugin(module,_name) :
self._modules[_name] = getattr(module,_name)
else:
print ([f'Found {_name}', 'not plugin'])
else:
#
# @TODO: We should log this somewhere some how
print (['skipping ',_name, hasattr(module,_name)])
pass
else:
#
# Initialization is empty
self._names = []
pass
def set(self,_pointer) :
self._module[_name] = getattr(module,_name)
# self._names [_name]
def format (self,**_args):
uri = _args['alias'],_args['name']
# def set(self,_pointer) :
def set(self,_key) :
"""
This function will set a pointer to the list of modules to be called
This should be used within the context of using the framework as a library
"""
_name = _pointer.__name__
if type(_key).__name__ == 'function':
#
# The pointer is in the code provided by the user and loaded in memory
#
_pointer = _key
_key = 'inline@'+_key.__name__
# self._names.append(_key.__name__)
else:
_pointer = self._registry.get(key=_key)
if _pointer :
self._modules[_key] = _pointer
self._names.append(_key)
self._modules[_name] = _pointer
self._names.append(_name)
def isplugin(self,module,name):
"""
This function determines if a module is a recognized plugin
@ -107,12 +120,31 @@ class PluginLoader :
_n = len(self._names)
return len(set(self._modules.keys()) & set (self._names)) / _n
def apply(self,_data):
def apply(self,_data,_logger=[]):
_input= {}
for _name in self._modules :
try:
_input = {'action':'plugin','object':_name,'input':{'status':'PASS'}}
_pointer = self._modules[_name]
if type(_data) == list :
_data = pd.DataFrame(_data)
_brow,_bcol = list(_data.shape)
#
# @TODO: add exception handling
_data = _pointer(_data)
_input['input']['shape'] = {'rows-dropped':_brow - _data.shape[0]}
except Exception as e:
_input['input']['status'] = 'FAILED'
print (e)
time.sleep(1)
if _logger:
try:
_logger(**_input)
except Exception as e:
pass
return _data
# def apply(self,_data,_name):
# """

@ -11,7 +11,7 @@ BIGQUERY ='bigquery'
FILE = 'file'
ETL = 'etl'
SQLITE = 'sqlite'
SQLITE = 'sqlite3'
SQLITE3= 'sqlite3'
DUCKDB = 'duckdb'
@ -44,7 +44,9 @@ PGSQL = POSTGRESQL
AWS_S3 = 's3'
RABBIT = RABBITMQ
ICEBERG='iceberg'
APACHE_ICEBERG = 'iceberg'
DRILL = 'drill'
APACHE_DRILL = 'drill'
# QLISTENER = 'qlistener'

@ -3,41 +3,48 @@ import json
from info import __version__
import copy
import transport
import importlib
import importlib.util
import shutil
from io import StringIO
"""
This class manages data from the registry and allows (read only)
@TODO: add property to the DATA attribute
"""
if 'HOME' in os.environ :
REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
else:
REGISTRY_PATH=os.sep.join([os.environ['USERPROFILE'],'.data-transport'])
#
# This path can be overriden by an environment variable ...
#
if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ :
REGISTRY_PATH = os.environ['DATA_TRANSPORT_REGISTRY_PATH']
REGISTRY_FILE= 'transport-registry.json'
DATA = {}
def isloaded ():
return DATA not in [{},None]
def exists (path=REGISTRY_PATH) :
def exists (path=REGISTRY_PATH,_file=REGISTRY_FILE) :
"""
This function determines if there is a registry at all
"""
p = os.path.exists(path)
q = os.path.exists( os.sep.join([path,REGISTRY_FILE]))
q = os.path.exists( os.sep.join([path,_file]))
return p and q
def load (_path=REGISTRY_PATH):
def load (_path=REGISTRY_PATH,_file=REGISTRY_FILE):
global DATA
if exists(_path) :
path = os.sep.join([_path,REGISTRY_FILE])
path = os.sep.join([_path,_file])
f = open(path)
DATA = json.loads(f.read())
f.close()
def init (email,path=REGISTRY_PATH,override=False):
def init (email,path=REGISTRY_PATH,override=False,_file=REGISTRY_FILE):
"""
Initializing the registry and will raise an exception in the advent of an issue
"""
@ -47,7 +54,7 @@ def init (email,path=REGISTRY_PATH,override=False):
_config = {"email":email,'version':__version__}
if not os.path.exists(path):
os.makedirs(path)
filename = os.sep.join([path,REGISTRY_FILE])
filename = os.sep.join([path,_file])
if not os.path.exists(filename) or override == True :
f = open(filename,'w')
@ -62,6 +69,8 @@ def init (email,path=REGISTRY_PATH,override=False):
def lookup (label):
global DATA
return label in DATA
has = lookup
def get (label='default') :
global DATA
return copy.copy(DATA[label]) if label in DATA else {}
@ -73,8 +82,11 @@ def set (label, auth_file, default=False,path=REGISTRY_PATH) :
if label == 'default' :
raise Exception ("""Invalid label name provided, please change the label name and use the switch""")
reg_file = os.sep.join([path,REGISTRY_FILE])
if os.path.exists (auth_file) and os.path.exists(path) and os.path.exists(reg_file):
if os.path.exists(path) and os.path.exists(reg_file):
if type(auth_file) == str and os.path.exists (auth_file) :
f = open(auth_file)
elif type(auth_file) == StringIO:
f = auth_file
_info = json.loads(f.read())
f.close()
f = open(reg_file)

@ -3,7 +3,7 @@ This file encapsulates common operations associated with SQL databases via SQLAl
"""
import sqlalchemy as sqa
from sqlalchemy import text
from sqlalchemy import text , MetaData, inspect
import pandas as pd
@ -13,7 +13,13 @@ class Base:
self._port = None
self._database = _args['database']
self._table = _args['table'] if 'table' in _args else None
self._engine= sqa.create_engine(self._get_uri(**_args),future=True)
_uri = self._get_uri(**_args)
if type(_uri) == str :
self._engine= sqa.create_engine(_uri,future=True)
else:
_uri,_kwargs = _uri
self._engine= sqa.create_engine(_uri,**_kwargs,future=True)
def _set_uri(self,**_args) :
"""
:provider provider
@ -34,21 +40,33 @@ class Base:
:table optional name of the table (can be fully qualified)
"""
_table = self._table if 'table' not in _args else _args['table']
_map = {'TINYINT':'INTEGER','BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
_schema = []
if _table :
if sqa.__version__.startswith('1.') :
_handler = sqa.MetaData(bind=self._engine)
_handler.reflect()
else:
#
# sqlalchemy's version 2.+
_handler = sqa.MetaData()
_handler.reflect(bind=self._engine)
# if _table :
# if sqa.__version__.startswith('1.') :
# _handler = sqa.MetaData(bind=self._engine)
# _handler.reflect()
# else:
# #
# # sqlalchemy's version 2.+
# _handler = sqa.MetaData()
# _handler.reflect(bind=self._engine)
# #
# # Let us extract the schema with the native types
# _map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
# _schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns]
#
# Let us extract the schema with the native types
_map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
_schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns]
try:
if _table :
_inspector = inspect(self._engine)
_columns = _inspector.get_columns(_table)
_schema = [{'name':column['name'],'type':_map.get(str(column['type']),str(column['type'])) } for column in _columns]
return _schema
except Exception as e:
pass
# else:
return []
def has(self,**_args):
return self.meta(**_args)
def apply(self,sql):
@ -58,7 +76,7 @@ class Base:
@TODO: Execution of stored procedures
"""
if sql.lower().startswith('select') or sql.lower().startswith('with') :
if sql.strip().lower().startswith('select') or sql.strip().lower().startswith('with') or sql.strip().startswith('show'):
return pd.read_sql(sql,self._engine)
else:
@ -71,6 +89,7 @@ class Base:
class SQLBase(Base):
def __init__(self,**_args):
super().__init__(**_args)
self._schema = _args.get('schema',None)
def get_provider(self):
raise Exception ("Provider Needs to be set ...")
def get_default_port(self) :
@ -94,7 +113,11 @@ class SQLBase(Base):
# _uri = [_item.strip() for _item in _uri if _item.strip()]
# return '/'.join(_uri)
return f'{_provider}://{_host}/{_database}' if _account == '' else f'{_provider}://{_account}{_host}/{_database}'
def close(self,) :
try:
self._engine.dispose()
except :
pass
class BaseReader(SQLBase):
def __init__(self,**_args):
super().__init__(**_args)
@ -106,6 +129,8 @@ class BaseReader(SQLBase):
sql = _args['sql']
else:
_table = _args['table'] if 'table' in _args else self._table
if self._schema and type(self._schema) == str :
_table = f'{self._schema}.{_table}'
sql = f'SELECT * FROM {_table}'
return self.apply(sql)
@ -116,9 +141,11 @@ class BaseWriter (SQLBase):
"""
def __init__(self,**_args):
super().__init__(**_args)
def write(self,_data,**_args):
if type(_data) == dict :
_df = pd.DataFrame(_data)
_df = pd.DataFrame([_data])
elif type(_data) == list :
_df = pd.DataFrame(_data)
else:
@ -135,5 +162,8 @@ class BaseWriter (SQLBase):
# _mode['schema'] = _args['schema']
# if 'if_exists' in _args :
# _mode['if_exists'] = _args['if_exists']
if 'schema' in _args and type(_args['schema']) == str:
self._schema = _args.get('schema',None)
if self._schema :
_mode['schema'] = self._schema
_df.to_sql(_table,self._engine,**_mode)

@ -18,6 +18,8 @@ class Reader(Duck,BaseReader) :
def __init__(self,**_args):
Duck.__init__(self,**_args)
BaseReader.__init__(self,**_args)
def _get_uri(self,**_args):
return super()._get_uri(**_args),{'connect_args':{'read_only':True}}
class Writer(Duck,BaseWriter):
def __init__(self,**_args):
Duck.__init__(self,**_args)

@ -0,0 +1,7 @@
"""
This namespace/package is intended to handle read/writes against data warehouse solutions like :
- apache iceberg
- clickhouse (...)
"""
from . import iceberg, drill

@ -0,0 +1,55 @@
import sqlalchemy
import pandas as pd
from .. sql.common import BaseReader , BaseWriter
import sqlalchemy as sqa
class Drill :
__template = {'host':None,'port':None,'ssl':None,'table':None,'database':None}
def __init__(self,**_args):
self._host = _args['host'] if 'host' in _args else 'localhost'
self._port = _args['port'] if 'port' in _args else self.get_default_port()
self._ssl = False if 'ssl' not in _args else _args['ssl']
self._table = _args['table'] if 'table' in _args else None
if self._table and '.' in self._table :
_seg = self._table.split('.')
if len(_seg) > 2 :
self._schema,self._database = _seg[:2]
else:
self._database=_args['database']
self._schema = self._database.split('.')[0]
def _get_uri(self,**_args):
return f'drill+sadrill://{self._host}:{self._port}/{self._database}?use_ssl={self._ssl}'
def get_provider(self):
return "drill+sadrill"
def get_default_port(self):
return "8047"
def meta(self,**_args):
_table = _args['table'] if 'table' in _args else self._table
if '.' in _table :
_schema = _table.split('.')[:2]
_schema = '.'.join(_schema)
_table = _table.split('.')[-1]
else:
_schema = self._schema
# _sql = f"select COLUMN_NAME AS name, CASE WHEN DATA_TYPE ='CHARACTER VARYING' THEN 'CHAR ( 125 )' ELSE DATA_TYPE END AS type from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='{_schema}' and TABLE_NAME='{_table}'"
_sql = f"select COLUMN_NAME AS name, CASE WHEN DATA_TYPE ='CHARACTER VARYING' THEN 'CHAR ( '||COLUMN_SIZE||' )' ELSE DATA_TYPE END AS type from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='{_schema}' and TABLE_NAME='{_table}'"
try:
_df = pd.read_sql(_sql,self._engine)
return _df.to_dict(orient='records')
except Exception as e:
print (e)
pass
return []
class Reader (Drill,BaseReader) :
def __init__(self,**_args):
super().__init__(**_args)
self._chunksize = 0 if 'chunksize' not in _args else _args['chunksize']
self._engine= sqa.create_engine(self._get_uri(),future=True)
class Writer(Drill,BaseWriter):
def __init__(self,**_args):
super().__init__(self,**_args)

@ -0,0 +1,151 @@
"""
dependency:
- spark and SPARK_HOME environment variable must be set
NOTE:
When using streaming option, insure that it is inline with default (1000 rows) or increase it in spark-defaults.conf
"""
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import col, to_date, to_timestamp
import copy
class Iceberg :
def __init__(self,**_args):
"""
providing catalog meta information (you must get this from apache iceberg)
"""
#
# Turning off logging (it's annoying & un-professional)
#
# _spconf = SparkContext()
# _spconf.setLogLevel("ERROR")
#
# @TODO:
# Make arrangements for additional configuration elements
#
self._session = SparkSession.builder.appName("data-transport").getOrCreate()
self._session.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
# self._session.sparkContext.setLogLevel("ERROR")
self._catalog = self._session.catalog
self._table = _args['table'] if 'table' in _args else None
if 'catalog' in _args :
#
# Let us set the default catalog
self._catalog.setCurrentCatalog(_args['catalog'])
else:
# No current catalog has been set ...
pass
if 'database' in _args :
self._database = _args['database']
self._catalog.setCurrentDatabase(self._database)
else:
#
# Should we set the default as the first one if available ?
#
pass
self._catalogName = self._catalog.currentCatalog()
self._databaseName = self._catalog.currentDatabase()
def meta (self,**_args) :
"""
This function should return the schema of a table (only)
"""
_schema = []
try:
_table = _args['table'] if 'table' in _args else self._table
_tableName = self._getPrefix(**_args) + f".{_table}"
_tmp = self._session.table(_tableName).schema
_schema = _tmp.jsonValue()['fields']
for _item in _schema :
del _item['nullable'],_item['metadata']
except Exception as e:
pass
return _schema
def _getPrefix (self,**_args):
_catName = self._catalogName if 'catalog' not in _args else _args['catalog']
_datName = self._databaseName if 'database' not in _args else _args['database']
return '.'.join([_catName,_datName])
def apply(self,_query):
"""
sql query/command to run against apache iceberg
"""
return self._session.sql(_query).toPandas()
def has (self,**_args):
try:
_prefix = self._getPrefix(**_args)
if _prefix.endswith('.') :
return False
return _args['table'] in [_item.name for _item in self._catalog.listTables(_prefix)]
except Exception as e:
print (e)
return False
def close(self):
self._session.stop()
class Reader(Iceberg) :
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args):
_table = self._table
_prefix = self._getPrefix(**_args)
if 'table' in _args or _table:
_table = _args['table'] if 'table' in _args else _table
_table = _prefix + f'.{_table}'
return self._session.table(_table).toPandas()
else:
sql = _args['sql']
return self._session.sql(sql).toPandas()
pass
class Writer (Iceberg):
"""
Writing data to an Apache Iceberg data warehouse (using pyspark)
"""
def __init__(self,**_args):
super().__init__(**_args)
self._mode = 'append' if 'mode' not in _args else _args['mode']
self._table = None if 'table' not in _args else _args['table']
def format (self,_schema) :
_iceSchema = StructType([])
_map = {'integer':IntegerType(),'float':DoubleType(),'double':DoubleType(),'date':DateType(),
'timestamp':TimestampType(),'datetime':TimestampType(),'string':StringType(),'varchar':StringType()}
for _item in _schema :
_name = _item['name']
_type = _item['type'].lower()
if _type not in _map :
_iceType = StringType()
else:
_iceType = _map[_type]
_iceSchema.add (StructField(_name,_iceType,True))
return _iceSchema if len(_iceSchema) else []
def write(self,_data,**_args):
_prefix = self._getPrefix(**_args)
if 'table' not in _args and not self._table :
raise Exception (f"Table Name should be specified for catalog/database {_prefix}")
_schema = self.format(_args['schema']) if 'schema' in _args else []
if not _schema :
rdd = self._session.createDataFrame(_data,verifySchema=False)
else :
rdd = self._session.createDataFrame(_data,schema=_schema,verifySchema=True)
_mode = self._mode if 'mode' not in _args else _args['mode']
_table = self._table if 'table' not in _args else _args['table']
# print (_data.shape,_mode,_table)
if not self._session.catalog.tableExists(_table):
# # @TODO:
# # add partitioning information here
rdd.writeTo(_table).using('iceberg').create()
# # _mode = 'overwrite'
# # rdd.write.format('iceberg').mode(_mode).saveAsTable(_table)
else:
# rdd.writeTo(_table).append()
# # _table = f'{_prefix}.{_table}'
rdd.coalesce(10).write.format('iceberg').mode('append').save(_table)
Loading…
Cancel
Save