refactor: factory, etl, fixes: session

pull/6/head
Steve Nyemba 1 year ago
parent 324d81bd16
commit 3f7f3d7306

@ -14,19 +14,27 @@ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLI
Usage : Usage :
transport --config <path-to-file.json> --procs <number-procs> transport help -- will print this page
@TODO: Create tables if they don't exist for relational databases
example of configuration :
1. Move data from a folder to a data-store transport move <path> [index]
transport [--folder <path> ] --config <config.json> #-- assuming the configuration doesn't have folder <path> path to the configuration file
transport --folder <path> --provider <postgresql|mongo|sqlite> --<database|db> <name> --table|doc <document_name> <index> optional index within the configuration file
In this case the configuration should look like :
{folder:..., target:{}}
2. Move data from one source to another
transport --config <file.json>
{source:{..},target:{..}} or [{source:{..},target:{..}},{source:{..},target:{..}}]
e.g: configuration file (JSON formatted)
- single source to a single target
{"source":{"provider":"http","url":"https://cdn.wsform.com/wp-content/uploads/2020/06/agreement.csv"}
"target":{"provider":"sqlite3","path":"transport-demo.sqlite","table":"agreement"}
}
- single source to multiple targets
{
"source":{"provider":"http","url":"https://cdn.wsform.com/wp-content/uploads/2020/06/agreement.csv"},
"target":[
{"provider":"sqlite3","path":"transport-demo.sqlite","table":"agreement},
{"provider":"mongodb","db":"transport-demo","collection":"agreement"}
]
}
""" """
import pandas as pd import pandas as pd
@ -36,51 +44,111 @@ import sys
import transport import transport
import time import time
from multiprocessing import Process from multiprocessing import Process
SYS_ARGS = {} import typer
if len(sys.argv) > 1: import os
from transport import etl
N = len(sys.argv) from transport import providers
for i in range(1,N):
value = None # SYS_ARGS = {}
if sys.argv[i].startswith('--'): # if len(sys.argv) > 1:
key = sys.argv[i][2:] #.replace('-','')
SYS_ARGS[key] = 1 # N = len(sys.argv)
if i + 1 < N: # for i in range(1,N):
value = sys.argv[i + 1] = sys.argv[i+1].strip() # value = None
if key and value and not value.startswith('--'): # if sys.argv[i].startswith('--'):
SYS_ARGS[key] = value # key = sys.argv[i][2:] #.replace('-','')
# SYS_ARGS[key] = 1
# if i + 1 < N:
i += 2 # value = sys.argv[i + 1] = sys.argv[i+1].strip()
# if key and value and not value.startswith('--'):
if __name__ == '__main__' : # SYS_ARGS[key] = value
#
# Load information from the file ...
if 'help' in SYS_ARGS : # i += 2
print (__doc__)
else: app = typer.Typer()
try:
_info = json.loads(open(SYS_ARGS['config']).read()) # @app.command()
if 'index' in SYS_ARGS : def help() :
_index = int(SYS_ARGS['index']) print (__doc__)
_info = [_item for _item in _info if _info.index(_item) == _index] def wait(jobs):
pass while jobs :
elif 'id' in SYS_ARGS : jobs = [thread for thread in jobs if thread.is_alive()]
_info = [_item for _item in _info if 'id' in _item and _item['id'] == SYS_ARGS['id']] time.sleep(1)
procs = 1 if 'procs' not in SYS_ARGS else int(SYS_ARGS['procs']) @app.command()
jobs = transport.factory.instance(provider='etl',info=_info,procs=procs) def move (path,index=None):
print ([len(jobs),' Jobs are running'])
N = len(jobs) _proxy = lambda _object: _object.write(_object.read())
while jobs : if os.path.exists(path):
x = len(jobs) file = open(path)
jobs = [_job for _job in jobs if _job.is_alive()] _config = json.loads (file.read() )
if x != len(jobs) : file.close()
print ([len(jobs),'... jobs still running']) if index :
time.sleep(1) _config = _config[ int(index)]
print ([N,' Finished running']) etl.instance(**_config)
except Exception as e: else:
etl.instance(_config)
print (e)
#
# if type(_config) == dict :
# _object = transport.etl.instance(**_config)
# _proxy(_object)
# else:
# #
# # here we are dealing with a list of objects (long ass etl job)
# jobs = []
# failed = []
# for _args in _config :
# if index and _config.index(_args) != index :
# continue
# _object=transport.etl.instance(**_args)
# thread = Process(target=_proxy,args=(_object,))
# thread.start()
# jobs.append(thread())
# if _config.index(_args) == 0 :
# thread.join()
wait(jobs)
@app.command()
def generate (path:str):
__doc__="""
"""
_config = [{"source":{"provider":"http","url":"https://cdn.wsform.com/wp-content/uploads/2020/06/agreement.csv"},"target":{"provider":"file","path":"addresses.csv","delimiter":"csv"}}]
file = open(path,'w')
file.write(json.dumps(_config))
file.close()
# if __name__ == '__main__' :
# #
# # Load information from the file ...
# if 'help' in SYS_ARGS :
# print (__doc__)
# else:
# try:
# _info = json.loads(open(SYS_ARGS['config']).read())
# if 'index' in SYS_ARGS :
# _index = int(SYS_ARGS['index'])
# _info = [_item for _item in _info if _info.index(_item) == _index]
# pass
# elif 'id' in SYS_ARGS :
# _info = [_item for _item in _info if 'id' in _item and _item['id'] == SYS_ARGS['id']]
# procs = 1 if 'procs' not in SYS_ARGS else int(SYS_ARGS['procs'])
# jobs = transport.factory.instance(provider='etl',info=_info,procs=procs)
# print ([len(jobs),' Jobs are running'])
# N = len(jobs)
# while jobs :
# x = len(jobs)
# jobs = [_job for _job in jobs if _job.is_alive()]
# if x != len(jobs) :
# print ([len(jobs),'... jobs still running'])
# time.sleep(1)
# print ([N,' Finished running'])
# except Exception as e:
# print (e)

@ -17,7 +17,7 @@ args = {
"license":"MIT", "license":"MIT",
"packages":["transport"]} "packages":["transport"]}
args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite'] args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite']
args["install_requires"] = ['pymongo','sqlalchemy<2.0.0','pandas','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] args["install_requires"] = ['pymongo','sqlalchemy<2.0.0','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python']
args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git"
args['scripts'] = ['bin/transport'] args['scripts'] = ['bin/transport']
if sys.version_info[0] == 2 : if sys.version_info[0] == 2 :

@ -28,7 +28,7 @@ import importlib
import sys import sys
import sqlalchemy import sqlalchemy
if sys.version_info[0] > 2 : if sys.version_info[0] > 2 :
from transport.common import Reader, Writer,Console #, factory # from transport.common import Reader, Writer,Console #, factory
from transport import disk from transport import disk
from transport import s3 as s3 from transport import s3 as s3
@ -97,7 +97,7 @@ class factory :
TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}} TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}}
PROVIDERS = { PROVIDERS = {
"etl":{"class":{"read":etl.instance,"write":etl.instance}}, "etl":{"class":{"read":etl.instance,"write":etl.instance}},
"console":{"class":{"write":Console,"read":Console}}, # "console":{"class":{"write":Console,"read":Console}},
"file":{"class":{"read":disk.DiskReader,"write":disk.DiskWriter}}, "file":{"class":{"read":disk.DiskReader,"write":disk.DiskWriter}},
"sqlite":{"class":{"read":disk.SQLiteReader,"write":disk.SQLiteWriter}}, "sqlite":{"class":{"read":disk.SQLiteReader,"write":disk.SQLiteWriter}},
"postgresql":{"port":5432,"host":"localhost","database":None,"driver":pg,"default":{"type":"VARCHAR"},"class":{"read":sql.SQLReader,"write":sql.SQLWriter}}, "postgresql":{"port":5432,"host":"localhost","database":None,"driver":pg,"default":{"type":"VARCHAR"},"class":{"read":sql.SQLReader,"write":sql.SQLWriter}},
@ -124,6 +124,9 @@ class factory :
# #
# Legacy code being returned # Legacy code being returned
return factory._instance(**_args); return factory._instance(**_args);
else: else:
return instance(**_args) return instance(**_args)
@staticmethod @staticmethod
@ -175,22 +178,31 @@ def instance(**_pargs):
file.close() file.close()
_provider = _args['provider'] _provider = _args['provider']
_group = None _context = list( set(['read','write','listen']) & set(_args.keys()) )
if _context :
_context = _context[0]
else:
_context = _args['context'] if 'context' in _args else 'read'
# _group = None
# for _id in providers.CATEGORIES :
# if _provider in providers.CATEGORIES[_id] :
# _group = _id
# break
# if _group :
for _id in providers.CATEGORIES : if _provider in providers.PROVIDERS and _context in providers.PROVIDERS[_provider]:
if _provider in providers.CATEGORIES[_id] :
_group = _id
break
if _group :
_classPointer = _getClassInstance(_group,**_args) # _classPointer = _getClassInstance(_group,**_args)
_classPointer = providers.PROVIDERS[_provider][_context]
# #
# Let us reformat the arguments # Let us reformat the arguments
if 'read' in _args or 'write' in _args : # if 'read' in _args or 'write' in _args :
_args = _args['read'] if 'read' in _args else _args['write'] # _args = _args['read'] if 'read' in _args else _args['write']
_args['provider'] = _provider # _args['provider'] = _provider
if _group == 'sql' : # if _group == 'sql' :
if _provider in providers.CATEGORIES['sql'] :
_info = _get_alchemyEngine(**_args) _info = _get_alchemyEngine(**_args)
_args = dict(_args,**_info) _args = dict(_args,**_info)
@ -215,57 +227,68 @@ def _get_alchemyEngine(**_args):
This function returns the SQLAlchemy engine associated with parameters, This is only applicable for SQL _items This function returns the SQLAlchemy engine associated with parameters, This is only applicable for SQL _items
:_args arguments passed to the factory {provider and other} :_args arguments passed to the factory {provider and other}
""" """
#@TODO: Enable authentication files (private_key)
_username = _args['username'] if 'username' in _args else ''
_password = _args['password'] if 'password' in _args else ''
_account = _args['account'] if 'account' in _args else ''
_database = _args['database']
_provider = _args['provider'] _provider = _args['provider']
if _username != '': _pargs = {}
_account = _username + ':'+_password+'@' if _provider == providers.SQLITE3 :
_host = _args['host'] if 'host' in _args else '' _path = _args['database'] if 'database' in _args else _args['path']
_port = _args['port'] if 'port' in _args else '' uri = ''.join([_provider,':///',_path])
if _provider in providers.DEFAULT :
_default = providers.DEFAULT[_provider]
_host = _host if _host != '' else (_default['host'] if 'host' in _default else '')
_port = _port if _port != '' else (_default['port'] if 'port' in _default else '')
if _port == '':
_port = providers.DEFAULT['port'] if 'port' in providers.DEFAULT else ''
#
if _host != '' and _port != '' :
_fhost = _host+":"+str(_port) #--formatted hostname
else: else:
_fhost = _host
# Let us update the parameters we have thus far #@TODO: Enable authentication files (private_key)
_username = _args['username'] if 'username' in _args else ''
_password = _args['password'] if 'password' in _args else ''
_account = _args['account'] if 'account' in _args else ''
_database = _args['database'] if 'database' in _args else _args['path']
if _username != '':
_account = _username + ':'+_password+'@'
_host = _args['host'] if 'host' in _args else ''
_port = _args['port'] if 'port' in _args else ''
if _provider in providers.DEFAULT :
_default = providers.DEFAULT[_provider]
_host = _host if _host != '' else (_default['host'] if 'host' in _default else '')
_port = _port if _port != '' else (_default['port'] if 'port' in _default else '')
if _port == '':
_port = providers.DEFAULT['port'] if 'port' in providers.DEFAULT else ''
#
if _host != '' and _port != '' :
_fhost = _host+":"+str(_port) #--formatted hostname
else:
_fhost = _host
# Let us update the parameters we have thus far
# #
uri = ''.join([_provider,"://",_account,_fhost,'/',_database]) uri = ''.join([_provider,"://",_account,_fhost,'/',_database])
_pargs = {'host':_host,'port':_port,'username':_username,'password':_password}
_engine = sqlalchemy.create_engine (uri,future=True) _engine = sqlalchemy.create_engine (uri,future=True)
_out = {'sqlalchemy':_engine} _out = {'sqlalchemy':_engine}
_pargs = {'host':_host,'port':_port,'username':_username,'password':_password}
for key in _pargs : for key in _pargs :
if _pargs[key] != '' : if _pargs[key] != '' :
_out[key] = _pargs[key] _out[key] = _pargs[key]
return _out return _out
@DeprecationWarning
def _getClassInstance(_group,**_args): def _getClassInstance(_group,**_args):
""" """
This function returns the class instance we are attempting to instanciate This function returns the class instance we are attempting to instanciate
:_group items in providers.CATEGORIES.keys() :_group items in providers.CATEGORIES.keys()
:_args arguments passed to the factory class :_args arguments passed to the factory class
""" """
if 'read' in _args or 'write' in _args : # if 'read' in _args or 'write' in _args :
_context = 'read' if 'read' in _args else _args['write'] # _context = 'read' if 'read' in _args else _args['write']
_info = _args[_context] # _info = _args[_context]
else: # else:
_context = _args['context'] if 'context' in _args else 'read' # _context = _args['context'] if 'context' in _args else 'read'
_class = providers.READ[_group] if _context == 'read' else providers.WRITE[_group] # _class = providers.READ[_group] if _context == 'read' else providers.WRITE[_group]
if type(_class) == dict and _args['provider'] in _class: # if type(_class) == dict and _args['provider'] in _class:
_class = _class[_args['provider']] # _class = _class[_args['provider']]
return _class # return _class
@DeprecationWarning
def __instance(**_args): def __instance(**_args):
""" """

@ -93,29 +93,29 @@ class ReadWriter(Reader,Writer) :
This class implements the read/write functions aggregated This class implements the read/write functions aggregated
""" """
pass pass
class Console(Writer): # class Console(Writer):
lock = RLock() # lock = RLock()
def __init__(self,**_args): # def __init__(self,**_args):
self.lock = _args['lock'] if 'lock' in _args else False # self.lock = _args['lock'] if 'lock' in _args else False
self.info = self.write # self.info = self.write
self.debug = self.write # self.debug = self.write
self.log = self.write # self.log = self.write
pass # pass
def write (self,logs=None,**_args): # def write (self,logs=None,**_args):
if self.lock : # if self.lock :
Console.lock.acquire() # Console.lock.acquire()
try: # try:
_params = _args if logs is None and _args else logs # _params = _args if logs is None and _args else logs
if type(_params) == list: # if type(_params) == list:
for row in _params : # for row in _params :
print (row) # print (row)
else: # else:
print (_params) # print (_params)
except Exception as e : # except Exception as e :
print (e) # print (e)
finally: # finally:
if self.lock : # if self.lock :
Console.lock.release() # Console.lock.release()
""" """

@ -35,6 +35,9 @@ import json
import sys import sys
import transport import transport
import time import time
import os
from multiprocessing import Process from multiprocessing import Process
SYS_ARGS = {} SYS_ARGS = {}
if len(sys.argv) > 1: if len(sys.argv) > 1:
@ -52,199 +55,301 @@ if len(sys.argv) > 1:
i += 2 i += 2
class Transporter(Process):
class Post(Process): """
def __init__(self,**args): The transporter (Jason Stathem) moves data from one persistant store to another
super().__init__() - callback functions
self.store = args['target'] :onFinish callback function when finished
if 'provider' not in args['target'] : :onError callback function when an error occurs
pass :source source data specification
self.PROVIDER = args['target']['type'] :target destination(s) to move the data to
# self.writer = transport.factory.instance(**args['target']) """
else:
self.PROVIDER = args['target']['provider']
self.store['context'] = 'write'
# self.store = args['target']
self.store['lock'] = True
# self.writer = transport.instance(**args['target'])
#
# If the table doesn't exists maybe create it ?
#
self.rows = args['rows']
# self.rows = args['rows'].fillna('')
def log(self,**_args) :
if ETL.logger :
ETL.logger.info(**_args)
def run(self):
_info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows
writer = transport.factory.instance(**self.store)
writer.write(_info)
writer.close()
class ETL (Process):
logger = None
def __init__(self,**_args): def __init__(self,**_args):
super().__init__() super().__init__()
# self.onfinish = _args['onFinish']
self.name = _args['id'] if 'id' in _args else 'UNREGISTERED' # self._onerror = _args['onError']
# if 'provider' not in _args['source'] : self._source = _args['source']
# #@deprecate
# self.reader = transport.factory.instance(**_args['source'])
# else:
# #
# # This is the new interface
# _args['source']['context'] = 'read'
# self.reader = transport.instance(**_args['source'])
#
# do we have an sql query provided or not ....
# self.sql = _args['source']['sql'] if 'sql' in _args['source'] else None
# self.cmd = _args['source']['cmd'] if 'cmd' in _args['source'] else None
# self._oargs = _args['target'] #transport.factory.instance(**_args['target'])
self._source = _args ['source']
self._target = _args['target'] self._target = _args['target']
self._source['context'] = 'read'
self._target['context'] = 'write'
self.JOB_COUNT = _args['jobs']
self.jobs = []
# self.logger = transport.factory.instance(**_args['logger'])
def log(self,**_args) :
if ETL.logger :
ETL.logger.info(**_args)
def run(self):
# if self.cmd :
# idf = self.reader.read(**self.cmd)
# else:
# idf = self.reader.read()
# idf = pd.DataFrame(idf)
# # idf = idf.replace({np.nan: None}, inplace = True)
# idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()]
# self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT)
# #
# writing the data to a designated data source # Let's insure we can support multiple targets
self._target = [self._target] if type(self._target) != list else self._target
pass
def read(self,**_args):
"""
This function
"""
_reader = transport.factory.instance(**self._source)
# #
try: # If arguments are provided then a query is to be executed (not just a table dump)
return _reader.read() if 'args' not in self._source else _reader.read(**self._source['args'])
_log = {"name":self.name,"rows":{"input":0,"output":0}} def _delegate_write(self,_data,**_args):
_reader = transport.factory.instance(**self._source) """
if 'table' in self._source : This function will write a data-frame to a designated data-store, The function is built around a delegation design pattern
_df = _reader.read() :data data-frame or object to be written
"""
for _target in self._target :
if 'write' not in _target :
_target['context'] = 'write'
_target['lock'] = True
else: else:
_df = _reader.read(**self._source['cmd']) _target['write']['lock'] = True
_log['rows']['input'] = _df.shape[0] _writer = transport.factory.instance(**_target)
_writer.write(_data,**_args)
if hasattr(_writer,'close') :
_writer.close()
def write(self,_df,**_args):
"""
"""
SEGMENT_COUNT = 6
MAX_ROWS = 1000000
# _df = self.read()
_segments = np.array_split(np.range(_df.shape[0]),SEGMENT_COUNT) if _df.shape[0] > MAX_ROWS else np.array( [np.arange(_df.shape[0])])
# _index = 0
for _indexes in _segments :
_fwd_args = {} if not _args else _args
self._delegate_write(_df.iloc[_indexes],**_fwd_args)
# #
# Let's write the input data-frame to the target ... # @TODO: Perhaps consider writing up each segment in a thread/process (speeds things up?)
_writer = transport.factory.instance(**self._target) pass
_writer.write(_df)
_log['rows']['output'] = _df.shape[0]
# self.log(module='write',action='partitioning',jobs=self.JOB_COUNT)
# rows = np.array_split(np.arange(0,idf.shape[0]),self.JOB_COUNT)
# #
# # @TODO: locks
# for i in np.arange(self.JOB_COUNT) :
# # _id = ' '.join([str(i),' table ',self.name])
# indexes = rows[i]
# segment = idf.loc[indexes,:].copy() #.to_dict(orient='records')
# _name = "partition-"+str(i)
# if segment.shape[0] == 0 :
# continue
# proc = Post(target = self._oargs,rows = segment,name=_name)
# self.jobs.append(proc)
# proc.start()
# self.log(module='write',action='working',segment=str(self.name),table=self.name,rows=segment.shape[0])
# while self.jobs :
# jobs = [job for job in proc if job.is_alive()]
# time.sleep(1)
except Exception as e:
print (e)
self.log(**_log)
def is_done(self):
self.jobs = [proc for proc in self.jobs if proc.is_alive()]
return len(self.jobs) == 0
def instance(**_args):
"""
:path ,index, id
:param _info list of objects with {source,target}`
:param logger
"""
logger = _args['logger'] if 'logger' in _args else None
if 'path' in _args :
_info = json.loads((open(_args['path'])).read())
if 'index' in _args :
_index = int(_args['index'])
_info = _info[_index]
elif 'id' in _args :
_info = [_item for _item in _info if '_id' in _item and _item['id'] == _args['id']]
_info = _info[0] if _info else _info
else:
_info = _args['info']
if logger and type(logger) != str:
ETL.logger = logger
elif logger == 'console':
ETL.logger = transport.factory.instance(provider='console',context='write',lock=True)
if type(_info) in [list,dict] :
_info = _info if type(_info) != dict else [_info]
#
# The assumption here is that the objects within the list are {source,target}
jobs = []
for _item in _info :
_item['jobs'] = 5 if 'procs' not in _args else int(_args['procs']) def instance(**_args):
_job = ETL(**_item) _proxy = lambda _agent: _agent.write(_agent.read())
if 'source' in _args and 'target' in _args :
_job.start() _agent = Transporter(**_args)
jobs.append(_job) _proxy(_agent)
return jobs
else: else:
return None _config = _args['config']
_items = [Transporter(**_item) for _item in _config ]
if __name__ == '__main__' : _MAX_JOBS = 5
_info = json.loads(open (SYS_ARGS['config']).read()) _items = np.array_split(_items,_MAX_JOBS)
index = int(SYS_ARGS['index']) if 'index' in SYS_ARGS else None for _batch in _items :
procs = [] jobs = []
for _config in _info : for _item in _batch :
if 'source' in SYS_ARGS : thread = Process(target=_proxy,args = (_item,))
_config['source'] = {"type":"disk.DiskReader","args":{"path":SYS_ARGS['source'],"delimiter":","}} thread.start()
jobs.append(thread)
_config['jobs'] = 3 if 'jobs' not in SYS_ARGS else int(SYS_ARGS['jobs']) while jobs :
etl = ETL (**_config) jobs = [thread for thread in jobs if thread.is_alive()]
if index is None: time.sleep(1)
etl.start() pass
procs.append(etl) # class Post(Process):
# def __init__(self,**args):
elif _info.index(_config) == index : # super().__init__()
# self.store = args['target']
# print (_config) # if 'provider' not in args['target'] :
procs = [etl] # pass
etl.start() # self.PROVIDER = args['target']['type']
break # # self.writer = transport.factory.instance(**args['target'])
# # else:
# # self.PROVIDER = args['target']['provider']
N = len(procs) # self.store['context'] = 'write'
while procs : # # self.store = args['target']
procs = [thread for thread in procs if not thread.is_done()] # self.store['lock'] = True
if len(procs) < N : # # self.writer = transport.instance(**args['target'])
print (["Finished ",(N-len(procs)), " remaining ", len(procs)]) # #
N = len(procs) # # If the table doesn't exists maybe create it ?
time.sleep(1) # #
# print ("We're done !!") # self.rows = args['rows']
# # self.rows = args['rows'].fillna('')
# def log(self,**_args) :
# if ETL.logger :
# ETL.logger.info(**_args)
# def run(self):
# _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows
# writer = transport.factory.instance(**self.store)
# writer.write(_info)
# writer.close()
# class ETL (Process):
# logger = None
# def __init__(self,**_args):
# super().__init__()
# self.name = _args['id'] if 'id' in _args else 'UNREGISTERED'
# # if 'provider' not in _args['source'] :
# # #@deprecate
# # self.reader = transport.factory.instance(**_args['source'])
# # else:
# # #
# # # This is the new interface
# # _args['source']['context'] = 'read'
# # self.reader = transport.instance(**_args['source'])
# #
# # do we have an sql query provided or not ....
# # self.sql = _args['source']['sql'] if 'sql' in _args['source'] else None
# # self.cmd = _args['source']['cmd'] if 'cmd' in _args['source'] else None
# # self._oargs = _args['target'] #transport.factory.instance(**_args['target'])
# self._source = _args ['source']
# self._target = _args['target']
# self._source['context'] = 'read'
# self._target['context'] = 'write'
# self.JOB_COUNT = _args['jobs']
# self.jobs = []
# # self.logger = transport.factory.instance(**_args['logger'])
# def log(self,**_args) :
# if ETL.logger :
# ETL.logger.info(**_args)
# def run(self):
# # if self.cmd :
# # idf = self.reader.read(**self.cmd)
# # else:
# # idf = self.reader.read()
# # idf = pd.DataFrame(idf)
# # # idf = idf.replace({np.nan: None}, inplace = True)
# # idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()]
# # self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT)
# #
# # writing the data to a designated data source
# #
# try:
# _log = {"name":self.name,"rows":{"input":0,"output":0}}
# _reader = transport.factory.instance(**self._source)
# if 'table' in self._source :
# _df = _reader.read()
# else:
# _df = _reader.read(**self._source['cmd'])
# _log['rows']['input'] = _df.shape[0]
# #
# # Let's write the input data-frame to the target ...
# _writer = transport.factory.instance(**self._target)
# _writer.write(_df)
# _log['rows']['output'] = _df.shape[0]
# # self.log(module='write',action='partitioning',jobs=self.JOB_COUNT)
# # rows = np.array_split(np.arange(0,idf.shape[0]),self.JOB_COUNT)
# # #
# # # @TODO: locks
# # for i in np.arange(self.JOB_COUNT) :
# # # _id = ' '.join([str(i),' table ',self.name])
# # indexes = rows[i]
# # segment = idf.loc[indexes,:].copy() #.to_dict(orient='records')
# # _name = "partition-"+str(i)
# # if segment.shape[0] == 0 :
# # continue
# # proc = Post(target = self._oargs,rows = segment,name=_name)
# # self.jobs.append(proc)
# # proc.start()
# # self.log(module='write',action='working',segment=str(self.name),table=self.name,rows=segment.shape[0])
# # while self.jobs :
# # jobs = [job for job in proc if job.is_alive()]
# # time.sleep(1)
# except Exception as e:
# print (e)
# self.log(**_log)
# def is_done(self):
# self.jobs = [proc for proc in self.jobs if proc.is_alive()]
# return len(self.jobs) == 0
# def instance (**_args):
# """
# path to configuration file
# """
# _path = _args['path']
# _config = {}
# jobs = []
# if os.path.exists(_path) :
# file = open(_path)
# _config = json.loads(file.read())
# file.close()
# if _config and type
# def _instance(**_args):
# """
# :path ,index, id
# :param _info list of objects with {source,target}`
# :param logger
# """
# logger = _args['logger'] if 'logger' in _args else None
# if 'path' in _args :
# _info = json.loads((open(_args['path'])).read())
# if 'index' in _args :
# _index = int(_args['index'])
# _info = _info[_index]
# elif 'id' in _args :
# _info = [_item for _item in _info if '_id' in _item and _item['id'] == _args['id']]
# _info = _info[0] if _info else _info
# else:
# _info = _args['info']
# if logger and type(logger) != str:
# ETL.logger = logger
# elif logger == 'console':
# ETL.logger = transport.factory.instance(provider='console',context='write',lock=True)
# if type(_info) in [list,dict] :
# _info = _info if type(_info) != dict else [_info]
# #
# # The assumption here is that the objects within the list are {source,target}
# jobs = []
# for _item in _info :
# _item['jobs'] = 5 if 'procs' not in _args else int(_args['procs'])
# _job = ETL(**_item)
# _job.start()
# jobs.append(_job)
# return jobs
# else:
# return None
# if __name__ == '__main__' :
# _info = json.loads(open (SYS_ARGS['config']).read())
# index = int(SYS_ARGS['index']) if 'index' in SYS_ARGS else None
# procs = []
# for _config in _info :
# if 'source' in SYS_ARGS :
# _config['source'] = {"type":"disk.DiskReader","args":{"path":SYS_ARGS['source'],"delimiter":","}}
# _config['jobs'] = 3 if 'jobs' not in SYS_ARGS else int(SYS_ARGS['jobs'])
# etl = ETL (**_config)
# if index is None:
# etl.start()
# procs.append(etl)
# elif _info.index(_config) == index :
# # print (_config)
# procs = [etl]
# etl.start()
# break
# #
# #
# N = len(procs)
# while procs :
# procs = [thread for thread in procs if not thread.is_done()]
# if len(procs) < N :
# print (["Finished ",(N-len(procs)), " remaining ", len(procs)])
# N = len(procs)
# time.sleep(1)
# # print ("We're done !!")

@ -1,4 +1,4 @@
from transport.common import Reader, Writer,Console #, factory # from transport.common import Reader, Writer,Console #, factory
from transport import disk from transport import disk
import sqlite3 import sqlite3
from transport import s3 as s3 from transport import s3 as s3
@ -9,6 +9,7 @@ from transport import sql as sql
from transport import etl as etl from transport import etl as etl
from transport import qlistener from transport import qlistener
from transport import bricks from transport import bricks
from transport import session
import psycopg2 as pg import psycopg2 as pg
import mysql.connector as my import mysql.connector as my
from google.cloud import bigquery as bq from google.cloud import bigquery as bq
@ -33,6 +34,8 @@ MARIADB = 'mariadb'
COUCHDB = 'couch' COUCHDB = 'couch'
CONSOLE = 'console' CONSOLE = 'console'
ETL = 'etl' ETL = 'etl'
# #
# synonyms of the above # synonyms of the above
BQ = BIGQUERY BQ = BIGQUERY
@ -54,13 +57,37 @@ CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,C
READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader}, READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader},
'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader}, 'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader},
'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener}, 'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener},
'cli':{CONSOLE:Console},'memory':{CONSOLE:Console} # 'cli':{CONSOLE:Console},'memory':{CONSOLE:Console},'http':session.HttpReader
} }
WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter}, WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter},
'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter}, 'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter},
'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener},'cli':{CONSOLE:Console},'memory':{CONSOLE:Console} 'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener},
# 'cli':{CONSOLE:Console},
# 'memory':{CONSOLE:Console}, 'http':session.HttpReader
} }
# SQL_PROVIDERS = [POSTGRESQL,MYSQL,NETEZZA,MARIADB,SQLITE]
PROVIDERS = {
FILE:{'read':disk.DiskReader,'write':disk.DiskWriter},
SQLITE:{'read':disk.SQLiteReader,'write':disk.SQLiteWriter,'driver':sqlite3},
POSTGRESQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':pg,'default':{'host':'localhost','port':5432}},
NETEZZA:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':nz,'default':{'port':5480}},
REDSHIFT:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':pg,'default':{'host':'localhost','port':5432}},
RABBITMQ:{'read':queue.QueueReader,'writer':queue.QueueWriter,'context':queue.QueueListener,'default':{'host':'localhost','port':5432}},
MYSQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}},
MARIADB:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}},
S3:{'read':s3.s3Reader,'write':s3.s3Writer},
BIGQUERY:{'read':sql.BigQueryReader,'write':sql.BigQueryWriter},
QLISTENER:{'read':qlistener.qListener,'write':qlistener.qListener,'default':{'host':'localhost','port':5672}},
CONSOLE:{'read':qlistener.Console,"write":qlistener.Console},
HTTP:{'read':session.HttpReader,'write':session.HttpWriter},
DATABRICKS:{'read':bricks.BricksReader,'write':bricks.BricksWriter},
MONGODB:{'read':mongo.MongoReader,'write':mongo.MongoWriter,'default':{'port':27017,'host':'localhost'}},
COUCHDB:{'read':couch.CouchReader,'writer':couch.CouchWriter,'default':{'host':'localhost','port':5984}},
ETL :{'read':etl.Transporter,'write':etl.Transporter}
}
DEFAULT = {PG:{'host':'localhost','port':5432},MYSQL:{'host':'localhost','port':3306}} DEFAULT = {PG:{'host':'localhost','port':5432},MYSQL:{'host':'localhost','port':3306}}
DEFAULT[MONGODB] = {'port':27017,'host':'localhost'} DEFAULT[MONGODB] = {'port':27017,'host':'localhost'}
DEFAULT[REDSHIFT] = DEFAULT[PG] DEFAULT[REDSHIFT] = DEFAULT[PG]

@ -40,3 +40,8 @@ class qListener :
_q = qListener._queue[_id] _q = qListener._queue[_id]
_q.put(_data) _q.put(_data)
_q.join() _q.join()
class Console (qListener):
def __init__(self,**_args):
super().__init__(callback=print)
# self.callback = print

@ -1,54 +1,60 @@
from flask import request, session from flask import request, session
from datetime import datetime from datetime import datetime
import re import re
from common import Reader, Writer from transport.common import Reader, Writer
import json import json
import requests
from io import StringIO
import pandas as pd
class HttpRequestReader(Reader):
class HttpReader(Reader):
""" """
This class is designed to read data from an Http request file handler provided to us by flask This class is designed to read data from an Http request file handler provided to us by flask
The file will be heald in memory and processed accordingly The file will be heald in memory and processed accordingly
NOTE: This is inefficient and can crash a micro-instance (becareful) NOTE: This is inefficient and can crash a micro-instance (becareful)
""" """
def __init__(self,**params): def __init__(self,**_args):
self.file_length = 0 self._url = _args['url']
try: self._headers = None if 'headers' not in _args else _args['headers']
#self.file = params['file']
#self.file.seek(0, os.SEEK_END)
#self.file_length = self.file.tell()
#print 'size of file ',self.file_length # def isready(self):
self.content = params['file'].readlines() # return self.file_length > 0
self.file_length = len(self.content) def format(self,_response):
except Exception as e: _mimetype= _response.headers['Content-Type']
print ("Error ... ",e) if _mimetype == 'text/csv' or 'text/csv':
pass _content = _response.text
return pd.read_csv(StringIO(_content))
#
# @TODO: Add support for excel, JSON and other file formats that fit into a data-frame
#
def isready(self): return _response.text
return self.file_length > 0 def read(self,**_args):
def read(self,size =-1): if self._headers :
i = 1 r = requests.get(self._url,headers = self._headers)
for row in self.content: else:
i += 1 r = requests.get(self._url,headers = self._headers)
if size == i: return self.format(r)
break
yield row
class HttpSessionWriter(Writer): class HttpWriter(Writer):
""" """
This class is designed to write data to a session/cookie This class is designed to submit data to an endpoint (url)
""" """
def __init__(self,**params): def __init__(self,**_args):
""" """
@param key required session key @param key required session key
""" """
self.session = params['queue'] self._url = _args['url']
self.session['sql'] = [] self._name = _args['name']
self.session['csv'] = [] self._method = 'post' if 'method' not in _args else _args['method']
self.tablename = re.sub('..+$','',params['filename'])
self.session['uid'] = params['uid'] # self.session = params['queue']
# self.session['sql'] = []
# self.session['csv'] = []
# self.tablename = re.sub('..+$','',params['filename'])
# self.session['uid'] = params['uid']
#self.xchar = params['xchar'] #self.xchar = params['xchar']
@ -57,10 +63,26 @@ class HttpSessionWriter(Writer):
return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename) return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename)
def isready(self): def isready(self):
return True return True
def write(self,**params): def write(self,_data,**_args):
label = params['label'] #
row = params ['row'] #
_method = self._method if 'method' not in _args else _args['method']
_method = _method.lower()
_mimetype = 'text/csv'
if type(_data) == dict :
_mimetype = 'application/json'
_content = _data
else:
_content = _data.to_dict(orient='records')
_headers = {'Content-Type':_mimetype}
_pointer = getattr(requests,_method)
_pointer ({self._name:_content},headers=_headers)
# label = params['label']
# row = params ['row']
if label == 'usable': # if label == 'usable':
self.session['csv'].append(self.format(row,',')) # self.session['csv'].append(self.format(row,','))
self.session['sql'].append(self.format_sql(row)) # self.session['sql'].append(self.format_sql(row))

@ -291,17 +291,17 @@ class SQLWriter(SQLRW,Writer):
""" """
# inspect = False if 'inspect' not in _args else _args['inspect'] # inspect = False if 'inspect' not in _args else _args['inspect']
# cast = False if 'cast' not in _args else _args['cast'] # cast = False if 'cast' not in _args else _args['cast']
if not self.fields : # if not self.fields :
if type(info) == list : # if type(info) == list :
_fields = info[0].keys() # _fields = info[0].keys()
elif type(info) == dict : # elif type(info) == dict :
_fields = info.keys() # _fields = info.keys()
elif type(info) == pd.DataFrame : # elif type(info) == pd.DataFrame :
_fields = info.columns.tolist() # _fields = info.columns.tolist()
# _fields = info.keys() if type(info) == dict else info[0].keys() # # _fields = info.keys() if type(info) == dict else info[0].keys()
_fields = list (_fields) # # _fields = list (_fields)
self.init(_fields) # self.init(_fields)
try: try:
table = _args['table'] if 'table' in _args else self.table table = _args['table'] if 'table' in _args else self.table

@ -1,2 +1,2 @@
__author__ = 'The Phi Technology' __author__ = 'The Phi Technology'
__version__= '1.8.6' __version__= '1.9.0'

Loading…
Cancel
Save