bug fixes: ETL creating a template functions

v2.4
Steve Nyemba 5 days ago
parent 8381f4cbc0
commit b80c076ec9

@ -53,6 +53,8 @@ def wait(jobs):
while jobs :
jobs = [thread for thread in jobs if thread.is_alive()]
time.sleep(1)
def job (_args):
pass
@app_e.command(name="run")
def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"),
@ -177,7 +179,25 @@ def register (label:Annotated[str,typer.Argument(help="unique label that will be
except Exception as e:
_msg = f"""{TIMES_MARK} {e}"""
print (_msg)
@app_r.command(name="template")
def template(name:Annotated[str,typer.Argument(help="database technology provider" ) ]):
"""
This function will generate a template entry for the registry (content of an auth file)
"""
#
# retrieve the provider and display the template if it has one
for _module in ['sql','cloud','warehouse','nosql','other'] :
ref = getattr(transport,_module) if hasattr(transport,_module) else None
_entry = {}
if ref :
if hasattr(ref,name) :
_pointer = getattr(ref,name)
_entry = dict({'provider':name},**_pointer.template()) if hasattr(_pointer,'template') else {}
break
#
#
print ( json.dumps(_entry))
pass
@app_r.command(name="list")
def register_list ():
"""

@ -14,7 +14,7 @@
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": null,
"metadata": {},
"outputs": [
{
@ -58,7 +58,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": null,
"metadata": {},
"outputs": [
{
@ -103,7 +103,7 @@
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": null,
"metadata": {},
"outputs": [
{
@ -131,16 +131,28 @@
]
},
{
"cell_type": "code",
"execution_count": null,
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Streaming Large Volumes of Data\n",
"\n",
"It is recommended for large volumes of data to stream the data using **chunksize** as a parameter \n",
"\n",
"1. in the **read** method \n",
"2. or **transport.get.reader(\\*\\*...,chunksize=1000)**\n",
"\n",
"Use streaming because with large volumes of data some databases limit the volume of data for a single transaction in order to efficiently guarantee maintain **data integrity**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"display_name": "python (3.10.12)",
"language": "python",
"name": "python3"
},
@ -154,7 +166,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
"version": "3.10.12"
}
},
"nbformat": 4,

@ -14,6 +14,9 @@ import numpy as np
import time
MAX_CHUNK = 2000000
def template ():
return {'provider':'bigquery','private_key':'path-to-key','dataset':'name-of-dataset','table':'table','chunksize':MAX_CHUNK}
class BigQuery:
__template__= {"private_key":None,"dataset":None,"table":None}
def __init__(self,**_args):
@ -24,6 +27,7 @@ class BigQuery:
self.dtypes = _args['dtypes'] if 'dtypes' in _args else None
self.table = _args['table'] if 'table' in _args else None
self.client = bq.Client.from_service_account_json(self.path)
self._chunksize = int(_args['chunksize']) if 'chunksize' in _args else None
def meta(self,**_args):
"""
This function returns meta data for a given table or query with dataset/table properly formatted
@ -81,7 +85,14 @@ class Reader (BigQuery):
SQL += " LIMIT "+str(_args['limit'])
if (':dataset' in SQL or ':DATASET' in SQL) and self.dataset:
SQL = SQL.replace(':dataset',self.dataset).replace(':DATASET',self.dataset)
_info = {'credentials':self.credentials,'dialect':'standard'}
_info = {'credentials':self.credentials,'dialect':'standard'}
#
# @Ent-Feature : adding streaming capability here
#
if 'chunksize' in _args :
self._chunksize = int(_args['chunksize'])
if self._chunksize :
_info['chunksize'] = self._chunksize
return pd_gbq.read_gbq(SQL,**_info) if SQL else None
# return self.client.query(SQL).to_dataframe() if SQL else None

@ -17,6 +17,8 @@ import sqlalchemy
# from transport.common import Reader,Writer
import pandas as pd
def template ():
return {'provider':'databricks','host':'fqn-host','token':'token','cluster_path':'path-of-cluster','catalog':'name-of-catalog','database':'schema-or-database','table':'table','chunksize':10000}
class Bricks:
"""
@ -42,6 +44,7 @@ class Bricks:
_uri = f'''databricks+connector://token:{_token}@{_host}?http_path={_cluster_path}&catalog={_catalog}&schema={self._schema}'''
self._engine = sqlalchemy.create_engine (_uri)
self._chunksize = int(_args['chunksize']) if 'chunksize' in _args else None
pass
def meta(self,**_args):
table = _args['table'] if 'table' in _args else self._table
@ -64,7 +67,14 @@ class Bricks:
def apply(self,_sql):
try:
if _sql.lower().startswith('select') :
return pd.read_sql(_sql,self._engine)
#
# @ENT-Feature: adding streaming functions/variables
if not self._chunksize :
return pd.read_sql(_sql,self._engine)
else:
return pd.read_sql(_sql,self._engine,chunksize=self._chunksize)
except Exception as e:
pass
@ -84,7 +94,10 @@ class Reader(Bricks):
sql = f'SELECT * FROM {table}'
if limit :
sql = sql + f' LIMIT {limit}'
#
# @ENT-Feature: adding streaming functions/variables
if 'chunksize' in _args :
self._chunksize = int(_args['chunksize'])
if 'sql' in _args or 'table' in _args :
return self.apply(sql)
else:

@ -8,7 +8,8 @@ import pandas as pd
from io import StringIO
import json
import nextcloud_client as nextcloud
def template():
return {"url":None,"token":None,"uid":None,"file":None}
class Nextcloud :
__template__={"url":None,"token":None,"uid":None,"file":None}
def __init__(self,**_args):

@ -20,6 +20,9 @@ from io import StringIO
import pandas as pd
import json
def template():
return {'access_key':'access-key','secret_key':'secret-key','region':'region','bucket':'name-of-bucket','file':'file-name','chunksize':10000}
class s3 :
"""
@TODO: Implement a search function for a file given a bucket??
@ -38,6 +41,7 @@ class s3 :
self._bucket_name = args['bucket']
self._file_name = args['file']
self._region = args['region']
self._chunksize = int(args['chunksize']) if 'chunksize' in args else None
except Exception as e :
print (e)
pass
@ -89,7 +93,10 @@ class Reader(s3) :
if not _stream :
return None
if _object['ContentType'] in ['text/csv'] :
return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")))
if not self._chunksize :
return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")))
else:
return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")),chunksize=self._chunksize)
else:
return _stream

@ -163,6 +163,7 @@ class IReader(IO):
_objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__])
if types.GeneratorType == type(_data):
return self._stream(_data)
# if self._plugins :
# return self._stream(_data)
@ -278,7 +279,7 @@ class IETL(BaseIO) :
if _item['type'] in ['INTEGER','BIGINT','INT'] :
_column = _item['name']
_data[_column] = _data[_column].copy().fillna(0).astype(np.int64)
writer.write(_data,schema=_schema)
writer.write(_data)
except Exception as e:
_action = 'post-error'
_input['error'] = str(e)

@ -11,7 +11,8 @@ import sys
# from transport.common import Reader, Writer
from datetime import datetime
def template():
return {'dbname':'database','doc':'document','username':'username','password':'password','url':'url-with-port'}
class Couch:
"""
This class is a wrapper for read/write against couchdb. The class captures common operations for read/write.

@ -20,6 +20,9 @@ import re
from multiprocessing import Lock, RLock
from transport.common import IEncoder
def template():
return {'provider':'mongodb','host':'localhost','port':27017,'db':'db-name','collection':'collection-name','username':'username','password':'password','mechanism':'SCRAM-SHA-256'}
class Mongo :
lock = RLock()
"""

@ -4,6 +4,10 @@ This file is a wrapper around pandas built-in functionalities to handle characte
import pandas as pd
import numpy as np
import os
def template():
return {'path':None,'delimiter':None}
class File :
def __init__(self,**params):
"""

@ -7,6 +7,8 @@ import requests
from io import StringIO
import pandas as pd
def template():
return {'url':None,'headers':{'key':'value'}}
class Reader:
"""

@ -17,6 +17,10 @@ import sys
# from common import Reader, Writer
import json
from multiprocessing import RLock
def template():
return {'port':5672,'host':'localhost','queue':None,'vhost':None,'username':None,'password':None}
class MessageQueue:
"""
This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq)

@ -1,11 +1,14 @@
"""
This file encapsulates common operations associated with SQL databases via SQLAlchemy
@ENT:
- To support streaming (with generators) we the parameter chunksize which essentially enables streaming
"""
import sqlalchemy as sqa
from sqlalchemy import text , MetaData, inspect
import pandas as pd
def template():
return {'host':'localhost','database':'database','table':'table'}
class Base:
def __init__(self,**_args):
@ -20,6 +23,7 @@ class Base:
_uri,_kwargs = _uri
self._engine= sqa.create_engine(_uri,**_kwargs,future=True)
self._chunksize = int(_args['chunksize']) if 'chunksize' in _args else None
def _set_uri(self,**_args) :
"""
:provider provider
@ -78,7 +82,12 @@ class Base:
@TODO: Execution of stored procedures
"""
if sql.strip().lower().startswith('select') or sql.strip().lower().startswith('with') or sql.strip().startswith('show'):
return pd.read_sql(sql,self._engine)
if not self._chunksize:
return pd.read_sql(sql,self._engine)
else:
return pd.read_sql(sql,self._engine,chunksize=self._chunksize)
else:
_handler = self._engine.connect()
_handler.execute(text(sql.strip()))
@ -132,7 +141,12 @@ class BaseReader(SQLBase):
if self._schema and type(self._schema) == str :
_table = f'{self._schema}.{_table}'
sql = f'SELECT * FROM {_table}'
if 'chunksize' in _args :
self._chunksize = int(_args['chunksize'])
return self.apply(sql)
class BaseWriter (SQLBase):

@ -3,6 +3,9 @@ This module implements the handler for duckdb (in memory or not)
"""
from transport.sql.common import Base, BaseReader, BaseWriter
def template ():
return {'database':'path-to-database','table':'table'}
class Duck :
def __init__(self,**_args):
#

@ -1,8 +1,11 @@
"""
This file implements support for mysql and maria db (with drivers mysql+mysql)
"""
from transport.sql.common import BaseReader, BaseWriter
from transport.sql.common import BaseReader, BaseWriter, template as _template
# import mysql.connector as my
def template ():
return dict(_template(),**{'port':3306})
class MYSQL:
def get_provider(self):

@ -1,5 +1,8 @@
import nzpy as nz
from transport.sql.common import BaseReader, BaseWriter
from transport.sql.common import BaseReader, BaseWriter , template as _template
def template ():
return dict(_template(),**{'port':5480,'chunksize':10000})
class Netezza:
def get_provider(self):

@ -1,7 +1,10 @@
from transport.sql.common import BaseReader , BaseWriter
from transport.sql.common import BaseReader , BaseWriter, template as _template
from psycopg2.extensions import register_adapter, AsIs
import numpy as np
def template ():
return dict(_template(),**{'port':5432,'chunksize':10000})
register_adapter(np.int64, AsIs)

@ -2,6 +2,8 @@ import sqlalchemy
import pandas as pd
from transport.sql.common import Base, BaseReader, BaseWriter
from multiprocessing import RLock
def template():
return {'database':'path-to-database','table':'table'}
class SQLite3 :
lock = RLock()
def __init__(self,**_args):

@ -3,7 +3,11 @@ Handling Microsoft SQL Server via pymssql driver/connector
"""
import sqlalchemy
import pandas as pd
from transport.sql.common import Base, BaseReader, BaseWriter
from transport.sql.common import Base, BaseReader, BaseWriter, template as _template
def template ():
return dict(_template(),**{'port':1433})
class MsSQLServer:

@ -3,6 +3,8 @@ import pandas as pd
from .. sql.common import BaseReader , BaseWriter
import sqlalchemy as sqa
def template():
return {'host':'localhost','port':8047,'ssl':False,'table':None,'database':None}
class Drill :
__template = {'host':None,'port':None,'ssl':None,'table':None,'database':None}
def __init__(self,**_args):

@ -11,6 +11,10 @@ from pyspark.sql.types import *
from pyspark.sql.functions import col, to_date, to_timestamp
import copy
def template():
return {'catalog':None,'database':None,'table':None}
class Iceberg :
def __init__(self,**_args):
"""

Loading…
Cancel
Save