diff --git a/setup.py b/setup.py index c7ae339..0c729f4 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ args = { "packages": find_packages(include=['info','transport', 'transport.*'])} args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite'] -args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql','pyspark','pydrill'] +args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql','pyspark','pydrill','sqlalchemy_drill`'] args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args['scripts'] = ['bin/transport'] # if sys.version_info[0] == 2 : diff --git a/transport/iowrapper.py b/transport/iowrapper.py index e555ff5..2732003 100644 --- a/transport/iowrapper.py +++ b/transport/iowrapper.py @@ -50,9 +50,6 @@ class IO: for key in _data : _data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key]) self._logger.write(pd.DataFrame([_data])) #,table=self._logTable) - else: - print ([' ********** ']) - print (_args) def _init_plugins(self,_items): """ This function will load pipelined functions as a plugin loader @@ -105,9 +102,12 @@ class IReader(IO): _shape = [] for _segment in _data : _shape.append(list(_segment.shape)) - yield self._plugins.apply(_segment,self.log) - - self.log(action='streaming',object=self._agent._engine.name, input= {'shape':_shape}) + if self._plugins : + yield self._plugins.apply(_segment,self.log) + else: + yield _segment + _objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__]) + self.log(action='streaming',object=_objectName, input= {'shape':_shape}) def read(self,**_args): @@ -116,37 +116,28 @@ class IReader(IO): self._init_plugins(_args['plugins']) _data = self._agent.read(**_args) + _objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__]) if types.GeneratorType == type(_data): - if self._plugins : - return self._stream(_data) - else: - _count = 0 - for _segment in _data : - _count += 1 - yield _segment - self.log(action='streaming',object=_objectName, input= {'segments':_count}) + return self._stream(_data) + # if self._plugins : + # return self._stream(_data) + # else: + # _count = 0 + # for _segment in _data : + # _count += 1 + # yield _segment + # self.log(action='streaming',object=_objectName, input= {'segments':_count}) # return _data - else: - + elif type(_data) == pd.DataFrame : + _shape = _data.shape #[0,0] if not _data.shape[] else list(_data.shape) - self.log(action='read',object=_objectName, input=_data.shape) + self.log(action='read',object=_objectName, input=_shape) if self._plugins : _logs = [] _data = self._plugins.apply(_data,self.log) return _data - # if self._plugins and self._plugins.ratio() > 0 : - - # if types.GeneratorType == type(_data): - - # return self._stream(_data) - # else: - # _data = self._plugins.apply(_data) - # return _data - # else: - # self.log(action='read',object=self._agent._engine.name, input=_data.shape) - # return _data class IWriter(IO): lock = RLock() def __init__(self,_agent,pipeline=None,_logger=None): @@ -180,7 +171,6 @@ class IETL(IReader) : # super().__init__(transport.get.reader(**_args['source'])) super().__init__(transport.get.reader(**_source),_plugins) - # _logger = if 'target' in _args: self._targets = _args['target'] if type(_args['target']) == list else [_args['target']] @@ -189,6 +179,9 @@ class IETL(IReader) : self.jobs = [] # # If the parent is already multiprocessing + if 'token' in _source : + self._logToken = _source['token'] + self._sourceArgs = _source['args'] if 'args' in _source else None self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess'] # def run(self) : # """ @@ -196,7 +189,7 @@ class IETL(IReader) : # """ # return self.read() def run(self,**_args): - _data = super().read(**_args) + _data = super().read(**_args) if not self._sourceArgs else super().read(**self._sourceArgs) self._targets = [transport.get.writer(**_kwargs) for _kwargs in self._targets] if types.GeneratorType == type(_data): _index = 0 diff --git a/transport/other/callback.py b/transport/other/callback.py index aba2f02..97a3e27 100644 --- a/transport/other/callback.py +++ b/transport/other/callback.py @@ -9,7 +9,7 @@ import numpy as np import pandas as pd class Writer : - lock = Lock() + lock = None _queue = {'default':queue.Queue()} def __init__(self,**_args): self._cache = {} diff --git a/transport/sql/common.py b/transport/sql/common.py index 988a4ba..0b9a7e4 100644 --- a/transport/sql/common.py +++ b/transport/sql/common.py @@ -62,10 +62,13 @@ class Base: @TODO: Execution of stored procedures """ - if sql.lower().startswith('select') or sql.lower().startswith('with') : + + if sql.lower().replace('\n',' ').strip().startswith('select') or sql.lower().startswith('with') : if self._chunksize : + return pd.read_sql(sql,self._engine,chunksize=self._chunksize) else: + return pd.read_sql(sql,self._engine) else: _handler = self._engine.connect() @@ -114,7 +117,8 @@ class BaseReader(SQLBase): # print (dir (self)) _table = _args['table'] if 'table' in _args else self._table sql = f'SELECT * FROM {_table}' - return self.apply(sql) + + return self.apply(sql.replace('\n',' ').strip()) class BaseWriter (SQLBase): diff --git a/transport/warehouse/drill.py b/transport/warehouse/drill.py index 551ac6f..71f0e64 100644 --- a/transport/warehouse/drill.py +++ b/transport/warehouse/drill.py @@ -4,6 +4,7 @@ from .. sql.common import BaseReader , BaseWriter import sqlalchemy as sqa class Drill : + __template = {'host':None,'port':None,'ssl':None,'table':None,'database':None} def __init__(self,**_args): self._host = _args['host'] if 'host' in _args else 'localhost' @@ -19,9 +20,9 @@ class Drill : self._database=_args['database'] self._schema = self._database.split('.')[0] - + def _get_uri(self,**_args): - return f'drill+sadrill://{self._host}:{self._port}/{self._schema}?use_ssl={self._ssl}' + return f'drill+sadrill://{self._host}:{self._port}/{self._database}?use_ssl={self._ssl}' def get_provider(self): return "drill+sadrill" def get_default_port(self): diff --git a/transport/warehouse/iceberg.py b/transport/warehouse/iceberg.py index 8d54300..77e16ad 100644 --- a/transport/warehouse/iceberg.py +++ b/transport/warehouse/iceberg.py @@ -64,7 +64,11 @@ class Iceberg : _datName = self._databaseName if 'database' not in _args else _args['database'] return '.'.join([_catName,_datName]) - + def apply(self,_query): + """ + sql query/command to run against apache iceberg + """ + return self._session.sql(_query) def has (self,**_args): try: _prefix = self._getPrefix(**_args) @@ -74,8 +78,7 @@ class Iceberg : except Exception as e: print (e) return False - def apply(self,sql): - pass + def close(self): self._session.stop() class Reader(Iceberg) :