bug fixes: stream, drill,iceberg

v2.4
Steve Nyemba 1 week ago
parent 5dbe541025
commit a28848194a

@ -19,7 +19,7 @@ args = {
"packages": find_packages(include=['info','transport', 'transport.*'])} "packages": find_packages(include=['info','transport', 'transport.*'])}
args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite'] args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite']
args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql','pyspark','pydrill'] args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql','pyspark','pydrill','sqlalchemy_drill`']
args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git"
args['scripts'] = ['bin/transport'] args['scripts'] = ['bin/transport']
# if sys.version_info[0] == 2 : # if sys.version_info[0] == 2 :

@ -50,9 +50,6 @@ class IO:
for key in _data : for key in _data :
_data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key]) _data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key])
self._logger.write(pd.DataFrame([_data])) #,table=self._logTable) self._logger.write(pd.DataFrame([_data])) #,table=self._logTable)
else:
print ([' ********** '])
print (_args)
def _init_plugins(self,_items): def _init_plugins(self,_items):
""" """
This function will load pipelined functions as a plugin loader This function will load pipelined functions as a plugin loader
@ -105,9 +102,12 @@ class IReader(IO):
_shape = [] _shape = []
for _segment in _data : for _segment in _data :
_shape.append(list(_segment.shape)) _shape.append(list(_segment.shape))
yield self._plugins.apply(_segment,self.log) if self._plugins :
yield self._plugins.apply(_segment,self.log)
self.log(action='streaming',object=self._agent._engine.name, input= {'shape':_shape}) else:
yield _segment
_objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__])
self.log(action='streaming',object=_objectName, input= {'shape':_shape})
def read(self,**_args): def read(self,**_args):
@ -116,37 +116,28 @@ class IReader(IO):
self._init_plugins(_args['plugins']) self._init_plugins(_args['plugins'])
_data = self._agent.read(**_args) _data = self._agent.read(**_args)
_objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__]) _objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__])
if types.GeneratorType == type(_data): if types.GeneratorType == type(_data):
if self._plugins : return self._stream(_data)
return self._stream(_data) # if self._plugins :
else: # return self._stream(_data)
_count = 0 # else:
for _segment in _data : # _count = 0
_count += 1 # for _segment in _data :
yield _segment # _count += 1
self.log(action='streaming',object=_objectName, input= {'segments':_count}) # yield _segment
# self.log(action='streaming',object=_objectName, input= {'segments':_count})
# return _data # return _data
else: elif type(_data) == pd.DataFrame :
_shape = _data.shape #[0,0] if not _data.shape[] else list(_data.shape)
self.log(action='read',object=_objectName, input=_shape)
self.log(action='read',object=_objectName, input=_data.shape)
if self._plugins : if self._plugins :
_logs = [] _logs = []
_data = self._plugins.apply(_data,self.log) _data = self._plugins.apply(_data,self.log)
return _data return _data
# if self._plugins and self._plugins.ratio() > 0 :
# if types.GeneratorType == type(_data):
# return self._stream(_data)
# else:
# _data = self._plugins.apply(_data)
# return _data
# else:
# self.log(action='read',object=self._agent._engine.name, input=_data.shape)
# return _data
class IWriter(IO): class IWriter(IO):
lock = RLock() lock = RLock()
def __init__(self,_agent,pipeline=None,_logger=None): def __init__(self,_agent,pipeline=None,_logger=None):
@ -180,7 +171,6 @@ class IETL(IReader) :
# super().__init__(transport.get.reader(**_args['source'])) # super().__init__(transport.get.reader(**_args['source']))
super().__init__(transport.get.reader(**_source),_plugins) super().__init__(transport.get.reader(**_source),_plugins)
# _logger = # _logger =
if 'target' in _args: if 'target' in _args:
self._targets = _args['target'] if type(_args['target']) == list else [_args['target']] self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
@ -189,6 +179,9 @@ class IETL(IReader) :
self.jobs = [] self.jobs = []
# #
# If the parent is already multiprocessing # If the parent is already multiprocessing
if 'token' in _source :
self._logToken = _source['token']
self._sourceArgs = _source['args'] if 'args' in _source else None
self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess'] self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
# def run(self) : # def run(self) :
# """ # """
@ -196,7 +189,7 @@ class IETL(IReader) :
# """ # """
# return self.read() # return self.read()
def run(self,**_args): def run(self,**_args):
_data = super().read(**_args) _data = super().read(**_args) if not self._sourceArgs else super().read(**self._sourceArgs)
self._targets = [transport.get.writer(**_kwargs) for _kwargs in self._targets] self._targets = [transport.get.writer(**_kwargs) for _kwargs in self._targets]
if types.GeneratorType == type(_data): if types.GeneratorType == type(_data):
_index = 0 _index = 0

@ -9,7 +9,7 @@ import numpy as np
import pandas as pd import pandas as pd
class Writer : class Writer :
lock = Lock() lock = None
_queue = {'default':queue.Queue()} _queue = {'default':queue.Queue()}
def __init__(self,**_args): def __init__(self,**_args):
self._cache = {} self._cache = {}

@ -62,10 +62,13 @@ class Base:
@TODO: Execution of stored procedures @TODO: Execution of stored procedures
""" """
if sql.lower().startswith('select') or sql.lower().startswith('with') :
if sql.lower().replace('\n',' ').strip().startswith('select') or sql.lower().startswith('with') :
if self._chunksize : if self._chunksize :
return pd.read_sql(sql,self._engine,chunksize=self._chunksize) return pd.read_sql(sql,self._engine,chunksize=self._chunksize)
else: else:
return pd.read_sql(sql,self._engine) return pd.read_sql(sql,self._engine)
else: else:
_handler = self._engine.connect() _handler = self._engine.connect()
@ -114,7 +117,8 @@ class BaseReader(SQLBase):
# print (dir (self)) # print (dir (self))
_table = _args['table'] if 'table' in _args else self._table _table = _args['table'] if 'table' in _args else self._table
sql = f'SELECT * FROM {_table}' sql = f'SELECT * FROM {_table}'
return self.apply(sql)
return self.apply(sql.replace('\n',' ').strip())
class BaseWriter (SQLBase): class BaseWriter (SQLBase):

@ -4,6 +4,7 @@ from .. sql.common import BaseReader , BaseWriter
import sqlalchemy as sqa import sqlalchemy as sqa
class Drill : class Drill :
__template = {'host':None,'port':None,'ssl':None,'table':None,'database':None}
def __init__(self,**_args): def __init__(self,**_args):
self._host = _args['host'] if 'host' in _args else 'localhost' self._host = _args['host'] if 'host' in _args else 'localhost'
@ -21,7 +22,7 @@ class Drill :
self._schema = self._database.split('.')[0] self._schema = self._database.split('.')[0]
def _get_uri(self,**_args): def _get_uri(self,**_args):
return f'drill+sadrill://{self._host}:{self._port}/{self._schema}?use_ssl={self._ssl}' return f'drill+sadrill://{self._host}:{self._port}/{self._database}?use_ssl={self._ssl}'
def get_provider(self): def get_provider(self):
return "drill+sadrill" return "drill+sadrill"
def get_default_port(self): def get_default_port(self):

@ -64,7 +64,11 @@ class Iceberg :
_datName = self._databaseName if 'database' not in _args else _args['database'] _datName = self._databaseName if 'database' not in _args else _args['database']
return '.'.join([_catName,_datName]) return '.'.join([_catName,_datName])
def apply(self,_query):
"""
sql query/command to run against apache iceberg
"""
return self._session.sql(_query)
def has (self,**_args): def has (self,**_args):
try: try:
_prefix = self._getPrefix(**_args) _prefix = self._getPrefix(**_args)
@ -74,8 +78,7 @@ class Iceberg :
except Exception as e: except Exception as e:
print (e) print (e)
return False return False
def apply(self,sql):
pass
def close(self): def close(self):
self._session.stop() self._session.stop()
class Reader(Iceberg) : class Reader(Iceberg) :

Loading…
Cancel
Save