Compare commits

..

31 Commits
main ... v2.2.0

Author SHA1 Message Date
Steve Nyemba 4c2efc2892 documentation ... readme
5 months ago
Steve Nyemba a31481e196 fix
6 months ago
Steve Nyemba 89d762f39a bug fixes: conditional imports
6 months ago
Steve Nyemba 6e753a1fcd bug fixes
6 months ago
Steve Nyemba 18c54d7664 bug fixes
6 months ago
Steve Nyemba f06d26f9b6 bug fixes:installer & imports
6 months ago
Steve Nyemba be10ae17d7 bug fixes: installer & registry
6 months ago
Steve Nyemba befdf453f5 bug fix: crash with etl & process
6 months ago
Steve Nyemba fbdb4a4931 bug fix: registry and emails
6 months ago
Steve Nyemba 6e1c420952 project file specification
6 months ago
Steve Nyemba 66d881fdda upgrade pyproject.toml, bug fix with registry
6 months ago
Steve Nyemba de4e065ca6 bug fix with newer setuptools
6 months ago
Steve Nyemba e035f5eba0 windows bug fix, environment variable
6 months ago
Steve Nyemba 6f8019f582 bug fix
6 months ago
Steve Nyemba b0cd0b85dc bug fix: logger issue
7 months ago
Steve Nyemba 4b34c746ae bug fix: missing table
9 months ago
Steve Nyemba 0977ad1b18 setup fixes
10 months ago
Steve Nyemba 98ef8a848e bug fixes and dependencies
10 months ago
Steve Nyemba 469c6f89a2 fixes with plugin handler
10 months ago
Steve Nyemba dd10f6db78 bug fix: version & cli
10 months ago
Steve Nyemba dad2956a8c version update
10 months ago
Steve Nyemba eaa2b99a2d bug fix: schema (postgresql) construct
10 months ago
Steve Nyemba a1b5f2743c bug fixes ...
10 months ago
Steve Nyemba afa442ea8d versioning update edition
10 months ago
Steve Nyemba 30645e46bd bug fix: readonly for duckdb
10 months ago
Steve Nyemba cdf783143e ...
10 months ago
Steve Nyemba 1a8112f152 adding iceberg notebook
11 months ago
Steve Nyemba 49ebd4a432 bug fix: close & etl
11 months ago
Steve Nyemba c3627586b3 fix: refactor cli switches
12 months ago
Steve Nyemba 2a72de4cd6 bug fixes: registry and handling cli parameters as well as adding warehousing
12 months ago
Steve Nyemba d0e655e7e3 update, community edition baseline
1 year ago

@ -13,28 +13,20 @@ Data transport is a simple framework that:
## Installation ## Installation
Within the virtual environment perform the following (the following will install everything): Within the virtual environment perform the following :
pip install data-transport[all]@git+https://github.com/lnyemba/data-transport.git pip install git+https://github.com/lnyemba/data-transport.git
Options to install components in square brackets are **nosql**; **cloud**; **other** and **warehouse** Options to install components in square brackets
pip install data-transport[nosql,cloud,other, warehouse,all]@git+https://github.com/lnyemba/data-transport.git pip install data-transport[nosql,cloud,warehouse,all]@git+https://github.com/lnyemba/data-transport.git
The components available:
0. sql by default netezza; mysql; postgresql; duckdb; sqlite3; sqlserver
1. nosql mongodb/ferretdb; couchdb
2. cloud s3; bigquery; databricks
3. other files; http; rabbitmq
4. warehouse apache drill; apache iceberg
## Additional features ## Additional features
- Reads are separated from writes to avoid accidental writes. - In addition to read/write, there is support for functions for pre/post processing
- Streaming (for large volumes of data) by specifying chunksize
- CLI interface to add to registry, run ETL - CLI interface to add to registry, run ETL
- Implements best-pracices for collaborative environments like apache zeppelin; jupyterhub; SageMaker; ... - scales and integrates into shared environments like apache zeppelin; jupyterhub; SageMaker; ...
## Learn More ## Learn More

@ -53,8 +53,10 @@ def wait(jobs):
while jobs : while jobs :
jobs = [thread for thread in jobs if thread.is_alive()] jobs = [thread for thread in jobs if thread.is_alive()]
time.sleep(1) time.sleep(1)
def job (_args): # def wait (jobs):
pass # while jobs :
# jobs = [pthread for pthread in jobs if pthread.is_alive()]
@app_e.command(name="run") @app_e.command(name="run")
def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")], def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"), index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"),
@ -157,10 +159,6 @@ def initregistry (email:Annotated[str,typer.Argument(help="email")],
_msg = f"{TIMES_MARK} {e}" _msg = f"{TIMES_MARK} {e}"
print (_msg) print (_msg)
print () print ()
@app_r.command(name="add") @app_r.command(name="add")
def register (label:Annotated[str,typer.Argument(help="unique label that will be used to load the parameters of the database")], def register (label:Annotated[str,typer.Argument(help="unique label that will be used to load the parameters of the database")],
auth_file:Annotated[str,typer.Argument(help="path of the auth_file")], auth_file:Annotated[str,typer.Argument(help="path of the auth_file")],
@ -179,36 +177,8 @@ def register (label:Annotated[str,typer.Argument(help="unique label that will be
except Exception as e: except Exception as e:
_msg = f"""{TIMES_MARK} {e}""" _msg = f"""{TIMES_MARK} {e}"""
print (_msg) print (_msg)
@app_r.command(name="template")
def template(name:Annotated[str,typer.Argument(help="database technology provider" ) ]):
"""
This function will generate a template entry for the registry (content of an auth file)
"""
#
# retrieve the provider and display the template if it has one
for _module in ['sql','cloud','warehouse','nosql','other'] :
ref = getattr(transport,_module) if hasattr(transport,_module) else None
_entry = {}
if ref :
if hasattr(ref,name) :
_pointer = getattr(ref,name)
_entry = dict({'provider':name},**_pointer.template()) if hasattr(_pointer,'template') else {}
break
#
#
print ( json.dumps(_entry))
pass
@app_r.command(name="list")
def register_list ():
"""
This function will list existing registry entries and basic information {label,vendor}
"""
# print (transport.registry.DATA)
_reg = transport.registry.DATA
_data = [{'label':key,'provider':_reg[key]['provider']} for key in _reg if 'provider' in _reg[key]]
_data = pd.DataFrame(_data)
print (_data)
pass
@app_x.command(name='add') @app_x.command(name='add')
def register_plugs ( def register_plugs (
alias:Annotated[str,typer.Argument(help="unique function name within a file")], alias:Annotated[str,typer.Argument(help="unique function name within a file")],

@ -1,2 +0,0 @@
cd /D "%~dp0"
python transport %1 %2 %3 %4 %5 %6

@ -1,8 +1,8 @@
__app_name__ = 'data-transport' __app_name__ = 'data-transport'
__author__ = 'The Phi Technology' __author__ = 'The Phi Technology'
__version__= '2.4.30' __version__= '2.2.22'
__edition__= 'enterprise'
__email__ = "info@the-phi.com" __email__ = "info@the-phi.com"
__edition__= 'community'
__license__=f""" __license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba Copyright 2010 - 2024, Steve L. Nyemba
@ -20,6 +20,4 @@ __whatsnew__=f"""version {__version__},
3. support for streaming data, important to use this with large volumes of data 3. support for streaming data, important to use this with large volumes of data
""" """

@ -0,0 +1,138 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Writing to Apache Iceberg\n",
"\n",
"1. Insure you have a Google Bigquery service account key on disk\n",
"2. The service key location is set as an environment variable **BQ_KEY**\n",
"3. The dataset will be automatically created within the project associated with the service key\n",
"\n",
"The cell below creates a dataframe that will be stored within Google Bigquery"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['data transport version ', '2.4.0']\n"
]
}
],
"source": [
"#\n",
"# Writing to Google Bigquery database\n",
"#\n",
"import transport\n",
"from transport import providers\n",
"import pandas as pd\n",
"import os\n",
"\n",
"PRIVATE_KEY = os.environ['BQ_KEY'] #-- location of the service key\n",
"DATASET = 'demo'\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"# bqw = transport.get.writer(provider=providers.ICEBERG,catalog='mz',database='edw.mz',table='friends')\n",
"bqw = transport.get.writer(provider=providers.ICEBERG,table='edw.mz.friends')\n",
"bqw.write(_data,if_exists='replace') #-- default is append\n",
"print (['data transport version ', transport.__version__])\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Reading from Google Bigquery\n",
"\n",
"The cell below reads the data that has been written by the cell above and computes the average age within a Google Bigquery (simple query). \n",
"\n",
"- Basic read of the designated table (friends) created above\n",
"- Execute an aggregate SQL against the table\n",
"\n",
"**NOTE**\n",
"\n",
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" name age\n",
"0 James Bond 55\n",
"1 Steve Rogers 150\n",
"2 Steve Nyemba 44\n",
"--------- STATISTICS ------------\n"
]
}
],
"source": [
"\n",
"import transport\n",
"from transport import providers\n",
"import os\n",
"PRIVATE_KEY=os.environ['BQ_KEY']\n",
"pgr = transport.get.reader(provider=providers.ICEBERG,database='edw.mz')\n",
"_df = pgr.read(table='friends')\n",
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
"_sdf = pgr.read(sql=_query)\n",
"print (_df)\n",
"print ('--------- STATISTICS ------------')\n",
"# print (_sdf)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"\n",
"1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"\n",
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

@ -14,7 +14,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": 1,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -58,7 +58,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": 2,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -103,7 +103,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": 4,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -131,28 +131,16 @@
] ]
}, },
{ {
"cell_type": "markdown", "cell_type": "code",
"metadata": {}, "execution_count": null,
"source": [
"#### Streaming Large Volumes of Data\n",
"\n",
"It is recommended for large volumes of data to stream the data using **chunksize** as a parameter \n",
"\n",
"1. in the **read** method \n",
"2. or **transport.get.reader(\\*\\*...,chunksize=1000)**\n",
"\n",
"Use streaming because with large volumes of data some databases limit the volume of data for a single transaction in order to efficiently guarantee maintain **data integrity**"
]
},
{
"cell_type": "markdown",
"metadata": {}, "metadata": {},
"outputs": [],
"source": [] "source": []
} }
], ],
"metadata": { "metadata": {
"kernelspec": { "kernelspec": {
"display_name": "python (3.10.12)", "display_name": "Python 3",
"language": "python", "language": "python",
"name": "python3" "name": "python3"
}, },
@ -166,7 +154,7 @@
"name": "python", "name": "python",
"nbconvert_exporter": "python", "nbconvert_exporter": "python",
"pygments_lexer": "ipython3", "pygments_lexer": "ipython3",
"version": "3.10.12" "version": "3.9.7"
} }
}, },
"nbformat": 4, "nbformat": 4,

@ -39,14 +39,14 @@ Homepage = "https://healthcareio.the-phi.com/git/code/transport.git"
[tool.setuptools] [tool.setuptools]
include-package-data = true include-package-data = true
zip-safe = false zip-safe = false
script-files = ["bin/transport","bin/transport.cmd"] script-files = ["bin/transport"]
[tool.setuptools.packages.find] [tool.setuptools.packages.find]
include = [ "transport", "transport.*"] include = ["info","info.*", "transport", "transport.*"]
[tool.setuptools.dynamic] [tool.setuptools.dynamic]
version = {attr = "transport.info.__version__"} version = {attr = "info.__version__"}
#authors = {attr = "transport.__author__"} #authors = {attr = "meta.__author__"}
# If you have a info.py file, you might also want to include the author dynamically: # If you have a info.py file, you might also want to include the author dynamically:
# [tool.setuptools.dynamic] # [tool.setuptools.dynamic]

@ -31,17 +31,18 @@ except Exception as e:
try: try:
from transport import warehouse from transport import warehouse
except Exception as e: except Exception as e:
warehouse= {} warehouse = {}
try: try:
from transport import other from transport import other
except Exception as e: except Exception as e:
other = {} other = {}
import pandas as pd import pandas as pd
import json import json
import os import os
from transport.info import __version__,__author__,__email__,__license__,__app_name__,__whatsnew__,__edition__ from info import __version__,__author__,__email__,__license__,__app_name__,__whatsnew__,__edition__
from transport.iowrapper import IWriter, IReader, IETL from transport.iowrapper import IWriter, IReader, IETL
from transport.plugins import PluginLoader from transport.plugins import PluginLoader
from transport import providers from transport import providers

@ -14,11 +14,7 @@ import numpy as np
import time import time
MAX_CHUNK = 2000000 MAX_CHUNK = 2000000
def template ():
return {'provider':'bigquery','private_key':'path-to-key','dataset':'name-of-dataset','table':'table','chunksize':MAX_CHUNK}
class BigQuery: class BigQuery:
__template__= {"private_key":None,"dataset":None,"table":None}
def __init__(self,**_args): def __init__(self,**_args):
path = _args['service_key'] if 'service_key' in _args else _args['private_key'] path = _args['service_key'] if 'service_key' in _args else _args['private_key']
self.credentials = service_account.Credentials.from_service_account_file(path) self.credentials = service_account.Credentials.from_service_account_file(path)
@ -27,7 +23,6 @@ class BigQuery:
self.dtypes = _args['dtypes'] if 'dtypes' in _args else None self.dtypes = _args['dtypes'] if 'dtypes' in _args else None
self.table = _args['table'] if 'table' in _args else None self.table = _args['table'] if 'table' in _args else None
self.client = bq.Client.from_service_account_json(self.path) self.client = bq.Client.from_service_account_json(self.path)
self._chunksize = int(_args['chunksize']) if 'chunksize' in _args else None
def meta(self,**_args): def meta(self,**_args):
""" """
This function returns meta data for a given table or query with dataset/table properly formatted This function returns meta data for a given table or query with dataset/table properly formatted
@ -86,13 +81,6 @@ class Reader (BigQuery):
if (':dataset' in SQL or ':DATASET' in SQL) and self.dataset: if (':dataset' in SQL or ':DATASET' in SQL) and self.dataset:
SQL = SQL.replace(':dataset',self.dataset).replace(':DATASET',self.dataset) SQL = SQL.replace(':dataset',self.dataset).replace(':DATASET',self.dataset)
_info = {'credentials':self.credentials,'dialect':'standard'} _info = {'credentials':self.credentials,'dialect':'standard'}
#
# @Ent-Feature : adding streaming capability here
#
if 'chunksize' in _args :
self._chunksize = int(_args['chunksize'])
if self._chunksize :
_info['chunksize'] = self._chunksize
return pd_gbq.read_gbq(SQL,**_info) if SQL else None return pd_gbq.read_gbq(SQL,**_info) if SQL else None
# return self.client.query(SQL).to_dataframe() if SQL else None # return self.client.query(SQL).to_dataframe() if SQL else None

@ -17,8 +17,6 @@ import sqlalchemy
# from transport.common import Reader,Writer # from transport.common import Reader,Writer
import pandas as pd import pandas as pd
def template ():
return {'provider':'databricks','host':'fqn-host','token':'token','cluster_path':'path-of-cluster','catalog':'name-of-catalog','database':'schema-or-database','table':'table','chunksize':10000}
class Bricks: class Bricks:
""" """
@ -28,7 +26,6 @@ class Bricks:
:cluster_path :cluster_path
:table :table
""" """
__template__ = {"host":None,"token":None,"cluster_path":None,"catalog":None,"schema":None}
def __init__(self,**_args): def __init__(self,**_args):
_host = _args['host'] _host = _args['host']
_token= _args['token'] _token= _args['token']
@ -44,7 +41,6 @@ class Bricks:
_uri = f'''databricks+connector://token:{_token}@{_host}?http_path={_cluster_path}&catalog={_catalog}&schema={self._schema}''' _uri = f'''databricks+connector://token:{_token}@{_host}?http_path={_cluster_path}&catalog={_catalog}&schema={self._schema}'''
self._engine = sqlalchemy.create_engine (_uri) self._engine = sqlalchemy.create_engine (_uri)
self._chunksize = int(_args['chunksize']) if 'chunksize' in _args else None
pass pass
def meta(self,**_args): def meta(self,**_args):
table = _args['table'] if 'table' in _args else self._table table = _args['table'] if 'table' in _args else self._table
@ -67,14 +63,7 @@ class Bricks:
def apply(self,_sql): def apply(self,_sql):
try: try:
if _sql.lower().startswith('select') : if _sql.lower().startswith('select') :
# return pd.read_sql(_sql,self._engine)
# @ENT-Feature: adding streaming functions/variables
if not self._chunksize :
return pd.read_sql(_sql,self._engine)
else:
return pd.read_sql(_sql,self._engine,chunksize=self._chunksize)
except Exception as e: except Exception as e:
pass pass
@ -94,10 +83,7 @@ class Reader(Bricks):
sql = f'SELECT * FROM {table}' sql = f'SELECT * FROM {table}'
if limit : if limit :
sql = sql + f' LIMIT {limit}' sql = sql + f' LIMIT {limit}'
#
# @ENT-Feature: adding streaming functions/variables
if 'chunksize' in _args :
self._chunksize = int(_args['chunksize'])
if 'sql' in _args or 'table' in _args : if 'sql' in _args or 'table' in _args :
return self.apply(sql) return self.apply(sql)
else: else:

@ -8,10 +8,8 @@ import pandas as pd
from io import StringIO from io import StringIO
import json import json
import nextcloud_client as nextcloud import nextcloud_client as nextcloud
def template():
return {"url":None,"token":None,"uid":None,"file":None}
class Nextcloud : class Nextcloud :
__template__={"url":None,"token":None,"uid":None,"file":None}
def __init__(self,**_args): def __init__(self,**_args):
pass pass
self._delimiter = None self._delimiter = None

@ -20,14 +20,10 @@ from io import StringIO
import pandas as pd import pandas as pd
import json import json
def template():
return {'access_key':'access-key','secret_key':'secret-key','region':'region','bucket':'name-of-bucket','file':'file-name','chunksize':10000}
class s3 : class s3 :
""" """
@TODO: Implement a search function for a file given a bucket?? @TODO: Implement a search function for a file given a bucket??
""" """
__template__={"access_key":None,"secret_key":None,"bucket":None,"file":None,"region":None}
def __init__(self,**args) : def __init__(self,**args) :
""" """
This function will extract a file or set of files from s3 bucket provided This function will extract a file or set of files from s3 bucket provided
@ -41,7 +37,6 @@ class s3 :
self._bucket_name = args['bucket'] self._bucket_name = args['bucket']
self._file_name = args['file'] self._file_name = args['file']
self._region = args['region'] self._region = args['region']
self._chunksize = int(args['chunksize']) if 'chunksize' in args else None
except Exception as e : except Exception as e :
print (e) print (e)
pass pass
@ -93,10 +88,7 @@ class Reader(s3) :
if not _stream : if not _stream :
return None return None
if _object['ContentType'] in ['text/csv'] : if _object['ContentType'] in ['text/csv'] :
if not self._chunksize : return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")))
return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")))
else:
return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")),chunksize=self._chunksize)
else: else:
return _stream return _stream

@ -0,0 +1,19 @@
"""
This file will be intended to handle duckdb database
"""
import duckdb
from transport.common import Reader,Writer
class Duck(Reader):
def __init__(self,**_args):
super().__init__(**_args)
self._path = None if 'path' not in _args else _args['path']
self._handler = duckdb.connect() if not self._path else duckdb.connect(self._path)
class DuckReader(Duck) :
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args) :
pass

@ -1,25 +0,0 @@
__app_name__ = 'data-transport'
__author__ = 'Steve L. Nyemba'
__version__= '2.4.34'
__edition__= 'enterprise'
__email__ = "info@the-phi.com"
__license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the Software), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED AS IS, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
__whatsnew__=f"""version {__version__},
1. Added support for read/write logs as well as plugins (when applied)
2. Bug fix with duckdb (adding readonly) for readers because there are issues with threads & processes
3. support for streaming data, important to use this with large volumes of data
"""

@ -5,113 +5,42 @@ NOTE: Plugins are converted to a pipeline, so we apply a pipeline when reading o
- upon initialization we will load plugins - upon initialization we will load plugins
- on read/write we apply a pipeline (if passed as an argument) - on read/write we apply a pipeline (if passed as an argument)
""" """
from transport.plugins import PluginLoader from transport.plugins import Plugin, PluginLoader
import transport import transport
from transport import providers from transport import providers
from multiprocessing import Process, RLock from multiprocessing import Process
import time import time
import types
from . import registry
from datetime import datetime
import pandas as pd
import numpy as np
import os
import sys
import itertools
import json
import plugin_ix import plugin_ix
class BaseIO : class IO:
def __init__(self,**_args):
self._logger = _args['logger'] if 'logger' in _args else None
self._logTable = 'logs' if 'logTable' not in _args else _args['logTable']
def setLogger(self,_logger):
self._logger = _logger
def log (self,**_args):
if self._logger :
_date = str(datetime.now())
_data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args)
for key in _data :
if type(_data[key]) == list :
_data[key] = [_item.__name__ if type(_item).__name__== 'function' else _item for _item in _data[key]]
_data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key])
self._logger.write(pd.DataFrame([_data])) #,table=self._logTable)
class IO(BaseIO):
""" """
Base wrapper class for read/write and support for logs Base wrapper class for read/write and support for logs
""" """
def __init__(self,**_args): def __init__(self,**_args):
#
# We need to initialize the logger here ...
#
super().__init__(**_args)
_agent = _args['agent'] _agent = _args['agent']
plugins = _args['plugins'] if 'plugins' else None plugins = _args['plugins'] if 'plugins' in _args else None
# _logger = _args['logger'] if 'logger' in _args else None
# self._logger = _logger if not type(_agent) in [IReader,IWriter] else _agent._logger #transport.get.writer(label='logger') #if registry.has('logger') else None
# if not _logger and hasattr(_agent,'_logger') :
# self._logger = getattr(_agent,'_logger')
self._agent = _agent self._agent = _agent
_date = _date = str(datetime.now()) # self._ixloader = plugin_ix.Loader () #-- must indicate where the plugin registry file is
self._ixloader = plugin_ix.Loader (registry=plugin_ix.Registry(folder=transport.registry.REGISTRY_PATH)) self._ixloader = plugin_ix.Loader (registry=plugin_ix.Registry(folder=transport.registry.REGISTRY_PATH))
# self._logTable = 'logs' #'_'.join(['logs',_date[:10]+_date[11:19]]).replace(':','').replace('-','_')
if plugins : if plugins :
self.init_plugins(plugins) self.init_plugins(plugins)
# def setLogger(self,_logger):
# self._logger = _logger
# def log (self,**_args):
# if self._logger :
# _date = str(datetime.now())
# _data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args)
# for key in _data :
# if type(_data[key]) == list :
# _data[key] = [_item.__name__ if type(_item).__name__== 'function' else _item for _item in _data[key]]
# _data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key])
# self._logger.write(pd.DataFrame([_data])) #,table=self._logTable)
# def _init_plugins(self,_items):
# """
# This function will load pipelined functions as a plugin loader
# """
# registry.plugins.init()
# self._plugins = PluginLoader(registry=registry.plugins)
# [self._plugins.set(_name) for _name in _items]
# self.log(action='init-plugins',object=self.getClassName(self),input =[_name for _name in _items])
# # if 'path' in _args and 'names' in _args :
# # self._plugins = PluginLoader(**_args)
# # else:
# # self._plugins = PluginLoader(registry=registry.plugins)
# # [self._plugins.set(_pointer) for _pointer in _args]
# #
# # @TODO: We should have a way to log what plugins are loaded and ready to use
def meta (self,**_args): def meta (self,**_args):
if hasattr(self._agent,'meta') : if hasattr(self._agent,'meta') :
return self._agent.meta(**_args) return self._agent.meta(**_args)
return [] return []
def getClassName (self,_object):
return '.'.join([_object.__class__.__module__,_object.__class__.__name__])
def close(self): def close(self):
if hasattr(self._agent,'close') : if hasattr(self._agent,'close') :
self._agent.close() self._agent.close()
def apply(self): # def apply(self):
""" # """
applying pre/post conditions given a pipeline expression # applying pre/post conditions given a pipeline expression
""" # """
for _pointer in self._plugins : # for _pointer in self._plugins :
_data = _pointer(_data) # _data = _pointer(_data)
time.sleep(1)
def apply(self,_query): def apply(self,_query):
if hasattr(self._agent,'apply') : if hasattr(self._agent,'apply') :
return self._agent.apply(_query) return self._agent.apply(_query)
@ -133,158 +62,70 @@ class IReader(IO):
""" """
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
self._args = _args['args']if 'args' in _args else None
def _stream (self,_data ):
# self.log(action='streaming',object=self._agent._engine.name, input= type(_data).__name__)
_shape = []
for _segment in _data :
_shape += list(_segment.shape)
if self._plugins :
# yield self._plugins.apply(_segment,self.log)
yield self._ixloader.visitor(_data,self.log)
else:
yield _segment
_objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__])
_input = {'shape':_shape}
if hasattr(self._agent,'_table') :
_input['table'] = self._agent._table
self.log(action='streaming',object=_objectName, input= _input)
def read(self,**_args): def read(self,**_args):
if 'plugins' in _args : if 'plugins' in _args :
self.init_plugins(_args['plugins']) self.init_plugins(_args['plugins'])
if self._args : _data = self._agent.read(**_args)
_data = self._agent.read(**self._args) # if self._plugins and self._plugins.ratio() > 0 :
else: # _data = self._plugins.apply(_data)
_data = self._agent.read(**_args) #
# output data
_objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__])
if types.GeneratorType == type(_data):
return self._stream(_data)
# if self._plugins :
# return self._stream(_data)
# else:
# _count = 0
# for _segment in _data :
# _count += 1
# yield _segment
# self.log(action='streaming',object=_objectName, input= {'segments':_count})
# return _data
elif type(_data) == pd.DataFrame :
_shape = _data.shape #[0,0] if not _data.shape[] else list(_data.shape)
_input = {'shape':_shape}
if hasattr(self._agent,'_table') :
_input['table'] = self._agent._table
self.log(action='read',object=_objectName, input=_input)
_data = self._ixloader.visitor(_data)
return _data
#
# applying the the design pattern
_data = self._ixloader.visitor(_data)
return _data
class IWriter(IO): class IWriter(IO):
lock = RLock() def __init__(self,**_args): #_agent,pipeline=None):
def __init__(self,**_args): super().__init__(**_args) #_agent,pipeline)
super().__init__(**_args)
def write(self,_data,**_args): def write(self,_data,**_args):
# if 'plugins' in _args :
# self._init_plugins(_args['plugins'])
if 'plugins' in _args : if 'plugins' in _args :
self._init_plugins(_args['plugins']) self.init_plugins(_args['plugins'])
# if self._plugins and self._plugins.ratio() > 0 :
# _logs = []
# _data = self._plugins.apply(_data,_logs,self.log)
# [self.log(**_item) for _item in _logs] self._ixloader.visitor(_data)
try: self._agent.write(_data,**_args)
# IWriter.lock.acquire()
_data = self._ixloader.visitor(_data)
self._agent.write(_data,**_args)
finally:
# IWriter.lock.release()
pass
# #
# The ETL object in its simplest form is an aggregation of read/write objects # The ETL object in its simplest form is an aggregation of read/write objects
# @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process # @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process
class IETL(BaseIO) : class IETL(IReader) :
""" """
This class performs an ETL operation by ineriting a read and adding writes as pipeline functions This class performs an ETL operation by ineriting a read and adding writes as pipeline functions
""" """
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(agent=transport.get.reader(**_args['source']),plugins=None)
super().__init__() if 'target' in _args:
self._source = _args['source'] self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
self._targets= _args['target'] if type(_args['target']) == list else [_args['target']]
#
# ETL Initialization, we should provide some measure of context ...
#
# def run(self) :
# """
# We should apply the etl here, if we are in multiprocessing mode
# """
# return self.read()
def run(self,**_args):
# _data = super().read(**_args) if not self._sourceArgs else super().read(**self._sourceArgs)
# self._targets = [transport.get.writer(**_kwargs) for _kwargs in self._targets]
_reader = transport.get.reader(**self._source)
if hasattr(_reader,'_logger') :
self.setLogger(_reader._logger)
self.log(action='init-etl',input={'source':self._source,'target':self._targets})
_data = _reader.read(**self._source['args'])if 'args' in self._source else _reader.read()
_reader.close()
_writers = [transport.get.writer(**_kwargs) for _kwargs in self._targets]
# _schema = [] if not getattr(_reader._agent,'_table') else _reader.meta()
_schema = [] if not hasattr(_reader._agent,'_table') else _reader.meta()
if types.GeneratorType == type(_data):
_index = 0
for _segment in _data :
_index += 1
for _writer in _writers :
self.post(_segment,writer=_writer,index=_index,schema=_schema)
time.sleep(1)
else: else:
self._targets = []
self.jobs = []
#
# If the parent is already multiprocessing
self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
def read(self,**_args):
_data = super().read(**_args)
_schema = super().meta()
for _kwargs in self._targets :
if _schema :
_kwargs['schema'] = _schema
self.post(_data,**_kwargs)
for _writer in _writers :
self.post(_data,writer=_writer,schema=_schema)
# pass
return _data return _data
# return _data def run(self) :
return self.read()
def post (self,_data,**_args) : def post (self,_data,**_args) :
""" """
This function returns an instance of a process that will perform the write operation This function returns an instance of a process that will perform the write operation
:_args parameters associated with writer object :_args parameters associated with writer object
""" """
#writer = transport.get.writer(**_args) writer = transport.get.writer(**_args)
_input = {} if 'schema' in _args :
try: writer.write(_data,schema=_args['schema'])
_action = 'post' else:
_shape = dict(zip(['rows','columns'],_data.shape))
_index = _args['index'] if 'index' in _args else 0
writer = _args['writer']
_schema= _args['schema']
#
# -- things to log
_input = {'shape':_shape,'segment':_index}
if hasattr(writer._agent,'_table'):
_input['table'] = writer._agent._table
for _item in _schema :
if _item['type'] in ['INTEGER','BIGINT','INT'] :
_column = _item['name']
_data[_column] = _data[_column].copy().fillna(0).astype(np.int64)
writer.write(_data) writer.write(_data)
except Exception as e: writer.close()
_action = 'post-error'
_input['error'] = str(e)
print ([e])
pass
self.log(action=_action,object=writer._agent.__module__, input= _input)

@ -11,8 +11,7 @@ import sys
# from transport.common import Reader, Writer # from transport.common import Reader, Writer
from datetime import datetime from datetime import datetime
def template():
return {'dbname':'database','doc':'document','username':'username','password':'password','url':'url-with-port'}
class Couch: class Couch:
""" """
This class is a wrapper for read/write against couchdb. The class captures common operations for read/write. This class is a wrapper for read/write against couchdb. The class captures common operations for read/write.
@ -20,7 +19,6 @@ class Couch:
@param doc user id involved @param doc user id involved
@param dbname database name (target) @param dbname database name (target)
""" """
__template__={"url":None,"doc":None,"dbname":None,"username":None,"password":None}
def __init__(self,**args): def __init__(self,**args):
url = args['url'] if 'url' in args else 'http://localhost:5984' url = args['url'] if 'url' in args else 'http://localhost:5984'
self._id = args['doc'] self._id = args['doc']

@ -20,15 +20,11 @@ import re
from multiprocessing import Lock, RLock from multiprocessing import Lock, RLock
from transport.common import IEncoder from transport.common import IEncoder
def template():
return {'provider':'mongodb','host':'localhost','port':27017,'db':'db-name','collection':'collection-name','username':'username','password':'password','mechanism':'SCRAM-SHA-256'}
class Mongo : class Mongo :
lock = RLock() lock = RLock()
""" """
Basic mongodb functions are captured here Basic mongodb functions are captured here
""" """
__template__={"db":None,"collection":None,"host":None,"port":None,"username":None,"password":None}
def __init__(self,**args): def __init__(self,**args):
""" """
:dbname database name/identifier :dbname database name/identifier

@ -9,7 +9,7 @@ import numpy as np
import pandas as pd import pandas as pd
class Writer : class Writer :
lock = None lock = Lock()
_queue = {'default':queue.Queue()} _queue = {'default':queue.Queue()}
def __init__(self,**_args): def __init__(self,**_args):
self._cache = {} self._cache = {}

@ -4,10 +4,6 @@ This file is a wrapper around pandas built-in functionalities to handle characte
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import os import os
def template():
return {'path':None,'delimiter':None}
class File : class File :
def __init__(self,**params): def __init__(self,**params):
""" """
@ -16,7 +12,7 @@ class File :
""" """
self.path = params['path'] if 'path' in params else None self.path = params['path'] if 'path' in params else None
self.delimiter = params['delimiter'] if 'delimiter' in params else ',' self.delimiter = params['delimiter'] if 'delimiter' in params else ','
self._chunksize = None if 'chunksize' not in params else int(params['chunksize'])
def isready(self): def isready(self):
return os.path.exists(self.path) return os.path.exists(self.path)
def meta(self,**_args): def meta(self,**_args):
@ -30,19 +26,11 @@ class Reader (File):
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
def _stream(self,path) :
reader = pd.read_csv(path,sep=self.delimiter,chunksize=self._chunksize,low_memory=False)
for segment in reader :
yield segment
def read(self,**args): def read(self,**args):
_path = self.path if 'path' not in args else args['path'] _path = self.path if 'path' not in args else args['path']
_delimiter = self.delimiter if 'delimiter' not in args else args['delimiter'] _delimiter = self.delimiter if 'delimiter' not in args else args['delimiter']
return pd.read_csv(_path,delimiter=self.delimiter)
_df = pd.read_csv(_path,sep=self.delimiter) if not self._chunksize else self._stream(_path)
if 'query' in args :
_query = args['query']
_df = _df.query(_query)
return _df
def stream(self,**args): def stream(self,**args):
raise Exception ("streaming needs to be implemented") raise Exception ("streaming needs to be implemented")
class Writer (File): class Writer (File):

@ -7,8 +7,6 @@ import requests
from io import StringIO from io import StringIO
import pandas as pd import pandas as pd
def template():
return {'url':None,'headers':{'key':'value'}}
class Reader: class Reader:
""" """

@ -17,10 +17,6 @@ import sys
# from common import Reader, Writer # from common import Reader, Writer
import json import json
from multiprocessing import RLock from multiprocessing import RLock
def template():
return {'port':5672,'host':'localhost','queue':None,'vhost':None,'username':None,'password':None}
class MessageQueue: class MessageQueue:
""" """
This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq) This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq)

@ -59,6 +59,9 @@ class PluginLoader :
pass pass
def load (self,**_args): def load (self,**_args):
"""
This function loads a plugin
"""
self._modules = {} self._modules = {}
self._names = [] self._names = []
path = _args ['path'] path = _args ['path']

@ -1,6 +1,6 @@
import os import os
import json import json
from transport.info import __version__ from info import __version__
import copy import copy
import transport import transport
import importlib import importlib
@ -12,7 +12,6 @@ from io import StringIO
This class manages data from the registry and allows (read only) This class manages data from the registry and allows (read only)
@TODO: add property to the DATA attribute @TODO: add property to the DATA attribute
""" """
if 'HOME' in os.environ : if 'HOME' in os.environ :
REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport']) REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
else: else:
@ -26,6 +25,7 @@ if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ :
REGISTRY_FILE= 'transport-registry.json' REGISTRY_FILE= 'transport-registry.json'
DATA = {} DATA = {}
def isloaded (): def isloaded ():
return DATA not in [{},None] return DATA not in [{},None]
def exists (path=REGISTRY_PATH,_file=REGISTRY_FILE) : def exists (path=REGISTRY_PATH,_file=REGISTRY_FILE) :

@ -3,7 +3,7 @@ This namespace/package wrap the sql functionalities for a certain data-stores
- netezza, postgresql, mysql and sqlite - netezza, postgresql, mysql and sqlite
- mariadb, redshift (also included) - mariadb, redshift (also included)
""" """
from . import postgresql, mysql, netezza, sqlite3, sqlserver, duckdb from . import postgresql, mysql, netezza, sqlite, sqlserver, duckdb
# #
@ -11,7 +11,7 @@ from . import postgresql, mysql, netezza, sqlite3, sqlserver, duckdb
# #
mariadb = mysql mariadb = mysql
redshift = postgresql redshift = postgresql
# sqlite3 = sqlite sqlite3 = sqlite
# from transport import sql # from transport import sql

@ -1,14 +1,11 @@
""" """
This file encapsulates common operations associated with SQL databases via SQLAlchemy This file encapsulates common operations associated with SQL databases via SQLAlchemy
@ENT:
- To support streaming (with generators) we the parameter chunksize which essentially enables streaming
""" """
import sqlalchemy as sqa import sqlalchemy as sqa
from sqlalchemy import text , MetaData, inspect from sqlalchemy import text , MetaData, inspect
import pandas as pd import pandas as pd
def template():
return {'host':'localhost','database':'database','table':'table'}
class Base: class Base:
def __init__(self,**_args): def __init__(self,**_args):
@ -23,7 +20,6 @@ class Base:
_uri,_kwargs = _uri _uri,_kwargs = _uri
self._engine= sqa.create_engine(_uri,**_kwargs,future=True) self._engine= sqa.create_engine(_uri,**_kwargs,future=True)
self._chunksize = int(_args['chunksize']) if 'chunksize' in _args else None
def _set_uri(self,**_args) : def _set_uri(self,**_args) :
""" """
:provider provider :provider provider
@ -71,7 +67,6 @@ class Base:
# else: # else:
return [] return []
def has(self,**_args): def has(self,**_args):
return self.meta(**_args) return self.meta(**_args)
def apply(self,sql): def apply(self,sql):
@ -83,14 +78,10 @@ class Base:
""" """
if sql.strip().lower().startswith('select') or sql.strip().lower().startswith('with') or sql.strip().startswith('show'): if sql.strip().lower().startswith('select') or sql.strip().lower().startswith('with') or sql.strip().startswith('show'):
if not self._chunksize: return pd.read_sql(sql,self._engine)
return pd.read_sql(sql,self._engine)
else:
return pd.read_sql(sql,self._engine,chunksize=self._chunksize)
else: else:
_handler = self._engine.connect() _handler = self._engine.connect()
_handler.execute(text(sql.strip())) _handler.execute(text(sql))
_handler.commit () _handler.commit ()
_handler.close() _handler.close()
return None return None
@ -141,21 +132,18 @@ class BaseReader(SQLBase):
if self._schema and type(self._schema) == str : if self._schema and type(self._schema) == str :
_table = f'{self._schema}.{_table}' _table = f'{self._schema}.{_table}'
sql = f'SELECT * FROM {_table}' sql = f'SELECT * FROM {_table}'
if 'chunksize' in _args :
self._chunksize = int(_args['chunksize'])
return self.apply(sql) return self.apply(sql)
class BaseWriter (SQLBase): class BaseWriter (SQLBase):
""" """
This class implements SQLAlchemy support for Writting to a data-store (RDBMS) This class implements SQLAlchemy support for Writting to a data-store (RDBMS)
""" """
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
def write(self,_data,**_args): def write(self,_data,**_args):
if type(_data) == dict : if type(_data) == dict :
_df = pd.DataFrame([_data]) _df = pd.DataFrame([_data])
elif type(_data) == list : elif type(_data) == list :

@ -3,9 +3,6 @@ This module implements the handler for duckdb (in memory or not)
""" """
from transport.sql.common import Base, BaseReader, BaseWriter from transport.sql.common import Base, BaseReader, BaseWriter
def template ():
return {'database':'path-to-database','table':'table'}
class Duck : class Duck :
def __init__(self,**_args): def __init__(self,**_args):
# #

@ -1,11 +1,8 @@
""" """
This file implements support for mysql and maria db (with drivers mysql+mysql) This file implements support for mysql and maria db (with drivers mysql+mysql)
""" """
from transport.sql.common import BaseReader, BaseWriter, template as _template from transport.sql.common import BaseReader, BaseWriter
# import mysql.connector as my # import mysql.connector as my
def template ():
return dict(_template(),**{'port':3306})
class MYSQL: class MYSQL:
def get_provider(self): def get_provider(self):

@ -1,8 +1,5 @@
import nzpy as nz import nzpy as nz
from transport.sql.common import BaseReader, BaseWriter , template as _template from transport.sql.common import BaseReader, BaseWriter
def template ():
return dict(_template(),**{'port':5480,'chunksize':10000})
class Netezza: class Netezza:
def get_provider(self): def get_provider(self):

@ -1,10 +1,7 @@
from transport.sql.common import BaseReader , BaseWriter, template as _template from transport.sql.common import BaseReader , BaseWriter
from psycopg2.extensions import register_adapter, AsIs from psycopg2.extensions import register_adapter, AsIs
import numpy as np import numpy as np
def template ():
return dict(_template(),**{'port':5432,'chunksize':10000})
register_adapter(np.int64, AsIs) register_adapter(np.int64, AsIs)

@ -1,11 +1,7 @@
import sqlalchemy import sqlalchemy
import pandas as pd import pandas as pd
from transport.sql.common import Base, BaseReader, BaseWriter from transport.sql.common import Base, BaseReader, BaseWriter
from multiprocessing import RLock class SQLite (BaseReader):
def template():
return {'database':'path-to-database','table':'table'}
class SQLite3 :
lock = RLock()
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
if 'path' in _args : if 'path' in _args :
@ -16,7 +12,7 @@ class SQLite3 :
path = self._database path = self._database
return f'sqlite:///{path}' # ensure this is the correct path for the sqlite file. return f'sqlite:///{path}' # ensure this is the correct path for the sqlite file.
class Reader(SQLite3,BaseReader): class Reader(SQLite,BaseReader):
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
# def read(self,**_args): # def read(self,**_args):
@ -24,12 +20,6 @@ class Reader(SQLite3,BaseReader):
# return pd.read_sql(sql,self._engine) # return pd.read_sql(sql,self._engine)
class Writer (SQLite3,BaseWriter): class Writer (SQLite,BaseWriter):
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
def write(self,_data,**_kwargs):
try:
SQLite3.lock.acquire()
super().write(_data,**_kwargs)
finally:
SQLite3.lock.release()

@ -3,15 +3,10 @@ Handling Microsoft SQL Server via pymssql driver/connector
""" """
import sqlalchemy import sqlalchemy
import pandas as pd import pandas as pd
from transport.sql.common import Base, BaseReader, BaseWriter, template as _template from transport.sql.common import Base, BaseReader, BaseWriter
def template ():
return dict(_template(),**{'port':1433})
class MsSQLServer: class MsSQLServer:
def __init__(self,**_args) : def __init__(self,**_args) :
super().__init__(**_args) super().__init__(**_args)
pass pass

@ -3,8 +3,6 @@ import pandas as pd
from .. sql.common import BaseReader , BaseWriter from .. sql.common import BaseReader , BaseWriter
import sqlalchemy as sqa import sqlalchemy as sqa
def template():
return {'host':'localhost','port':8047,'ssl':False,'table':None,'database':None}
class Drill : class Drill :
__template = {'host':None,'port':None,'ssl':None,'table':None,'database':None} __template = {'host':None,'port':None,'ssl':None,'table':None,'database':None}
def __init__(self,**_args): def __init__(self,**_args):

@ -11,10 +11,6 @@ from pyspark.sql.types import *
from pyspark.sql.functions import col, to_date, to_timestamp from pyspark.sql.functions import col, to_date, to_timestamp
import copy import copy
def template():
return {'catalog':None,'database':None,'table':None}
class Iceberg : class Iceberg :
def __init__(self,**_args): def __init__(self,**_args):
""" """

Loading…
Cancel
Save