Compare commits

..

38 Commits
v2.4 ... master

Author SHA1 Message Date
Steve L. Nyemba b461ce9d7b Merge pull request 'v2.2.0' (#33) from v2.2.0 into master
2 days ago
Steve Nyemba fbdb4a4931 bug fix: registry and emails
2 days ago
Steve Nyemba 6e1c420952 project file specification
6 days ago
Steve Nyemba 66d881fdda upgrade pyproject.toml, bug fix with registry
6 days ago
Steve L. Nyemba 6c26588462 Merge pull request 'v2.2.0' (#32) from v2.2.0 into master
2 weeks ago
Steve Nyemba de4e065ca6 bug fix with newer setuptools
2 weeks ago
Steve Nyemba e035f5eba0 windows bug fix, environment variable
2 weeks ago
Steve Nyemba 6f8019f582 bug fix
3 weeks ago
Steve L. Nyemba d3517a5720 Merge pull request 'bug fix: logger issue' (#31) from v2.2.0 into master
3 weeks ago
Steve Nyemba b0cd0b85dc bug fix: logger issue
2 months ago
Steve L. Nyemba 4c98e81c14 Merge pull request 'v2.2.0: bug fixes' (#30) from v2.2.0 into master
3 months ago
Steve Nyemba 4b34c746ae bug fix: missing table
3 months ago
Steve Nyemba 0977ad1b18 setup fixes
4 months ago
Steve Nyemba 98ef8a848e bug fixes and dependencies
4 months ago
Steve Nyemba 469c6f89a2 fixes with plugin handler
4 months ago
Steve Nyemba dd10f6db78 bug fix: version & cli
4 months ago
Steve Nyemba dad2956a8c version update
4 months ago
Steve Nyemba eaa2b99a2d bug fix: schema (postgresql) construct
4 months ago
Steve Nyemba a1b5f2743c bug fixes ...
5 months ago
Steve Nyemba afa442ea8d versioning update edition
5 months ago
Steve Nyemba 30645e46bd bug fix: readonly for duckdb
5 months ago
Steve Nyemba cdf783143e ...
5 months ago
Steve Nyemba 1a8112f152 adding iceberg notebook
5 months ago
Steve Nyemba 49ebd4a432 bug fix: close & etl
5 months ago
Steve Nyemba c3627586b3 fix: refactor cli switches
6 months ago
Steve Nyemba 2a72de4cd6 bug fixes: registry and handling cli parameters as well as adding warehousing
6 months ago
Steve Nyemba d0e655e7e3 update, community edition baseline
8 months ago
Steve L. Nyemba 492dc8f374 Merge pull request 'new provider console and bug fixes with applied commands' (#25) from v2.2.0 into master
10 months ago
Steve L. Nyemba e848367378 Merge pull request 'bug fix, duckdb in-memory handling' (#24) from v2.2.0 into master
10 months ago
Steve L. Nyemba c872ba8cc2 Merge pull request 'v2.2.0 - Bug fixes with mongodb, console' (#23) from v2.2.0 into master
10 months ago
Steve L. Nyemba baa8164f16 Merge pull request 'aws s3 notebook, brief example' (#22) from v2.2.0 into master
1 year ago
Steve L. Nyemba 31556ebd32 Merge pull request 'v2.2.0 bug fix - AWS-S3' (#21) from v2.2.0 into master
1 year ago
Steve L. Nyemba 1e7839198a Merge pull request 'v2.2.0 - shared environment support and duckdb support' (#20) from v2.2.0 into master
1 year ago
Steve L. Nyemba dce50a967e Merge pull request 'documentation ...' (#19) from v2.0.4 into master
1 year ago
Steve L. Nyemba 5ccb073865 Merge pull request 'refactor: etl,better reusability & streamlined and threaded' (#18) from v2.0.4 into master
1 year ago
Steve L. Nyemba 3081fb98e7 Merge pull request 'version 2.0 - Refactored, Plugins support' (#17) from v2.0 into master
1 year ago
Steve L. Nyemba 58959359ad Merge pull request 'bug fix: psycopg2 with numpy' (#14) from dev into master
1 year ago
Steve L. Nyemba 68b8f6af5f Merge pull request 'fixes 2024 pandas-gbq and sqlalchemy' (#10) from dev into master
1 year ago

@ -53,6 +53,10 @@ def wait(jobs):
while jobs : while jobs :
jobs = [thread for thread in jobs if thread.is_alive()] jobs = [thread for thread in jobs if thread.is_alive()]
time.sleep(1) time.sleep(1)
def wait (jobs):
while jobs :
jobs = [pthread for pthread in jobs if pthread.is_alive()]
@app_e.command(name="run") @app_e.command(name="run")
def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")], def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"), index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"),

@ -1,8 +1,8 @@
__app_name__ = 'data-transport' __app_name__ = 'data-transport'
__author__ = 'The Phi Technology' __author__ = 'The Phi Technology'
__version__= '2.4.30' __version__= '2.2.22'
__edition__= 'enterprise'
__email__ = "info@the-phi.com" __email__ = "info@the-phi.com"
__edition__= 'community'
__license__=f""" __license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba Copyright 2010 - 2024, Steve L. Nyemba
@ -20,6 +20,4 @@ __whatsnew__=f"""version {__version__},
3. support for streaming data, important to use this with large volumes of data 3. support for streaming data, important to use this with large volumes of data
""" """

@ -0,0 +1,138 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Writing to Apache Iceberg\n",
"\n",
"1. Insure you have a Google Bigquery service account key on disk\n",
"2. The service key location is set as an environment variable **BQ_KEY**\n",
"3. The dataset will be automatically created within the project associated with the service key\n",
"\n",
"The cell below creates a dataframe that will be stored within Google Bigquery"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['data transport version ', '2.4.0']\n"
]
}
],
"source": [
"#\n",
"# Writing to Google Bigquery database\n",
"#\n",
"import transport\n",
"from transport import providers\n",
"import pandas as pd\n",
"import os\n",
"\n",
"PRIVATE_KEY = os.environ['BQ_KEY'] #-- location of the service key\n",
"DATASET = 'demo'\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"# bqw = transport.get.writer(provider=providers.ICEBERG,catalog='mz',database='edw.mz',table='friends')\n",
"bqw = transport.get.writer(provider=providers.ICEBERG,table='edw.mz.friends')\n",
"bqw.write(_data,if_exists='replace') #-- default is append\n",
"print (['data transport version ', transport.__version__])\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Reading from Google Bigquery\n",
"\n",
"The cell below reads the data that has been written by the cell above and computes the average age within a Google Bigquery (simple query). \n",
"\n",
"- Basic read of the designated table (friends) created above\n",
"- Execute an aggregate SQL against the table\n",
"\n",
"**NOTE**\n",
"\n",
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" name age\n",
"0 James Bond 55\n",
"1 Steve Rogers 150\n",
"2 Steve Nyemba 44\n",
"--------- STATISTICS ------------\n"
]
}
],
"source": [
"\n",
"import transport\n",
"from transport import providers\n",
"import os\n",
"PRIVATE_KEY=os.environ['BQ_KEY']\n",
"pgr = transport.get.reader(provider=providers.ICEBERG,database='edw.mz')\n",
"_df = pgr.read(table='friends')\n",
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
"_sdf = pgr.read(sql=_query)\n",
"print (_df)\n",
"print ('--------- STATISTICS ------------')\n",
"# print (_sdf)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"\n",
"1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"\n",
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

@ -15,7 +15,6 @@ import time
MAX_CHUNK = 2000000 MAX_CHUNK = 2000000
class BigQuery: class BigQuery:
__template__= {"private_key":None,"dataset":None,"table":None}
def __init__(self,**_args): def __init__(self,**_args):
path = _args['service_key'] if 'service_key' in _args else _args['private_key'] path = _args['service_key'] if 'service_key' in _args else _args['private_key']
self.credentials = service_account.Credentials.from_service_account_file(path) self.credentials = service_account.Credentials.from_service_account_file(path)

@ -26,7 +26,6 @@ class Bricks:
:cluster_path :cluster_path
:table :table
""" """
__template__ = {"host":None,"token":None,"cluster_path":None,"catalog":None,"schema":None}
def __init__(self,**_args): def __init__(self,**_args):
_host = _args['host'] _host = _args['host']
_token= _args['token'] _token= _args['token']

@ -10,7 +10,6 @@ import json
import nextcloud_client as nextcloud import nextcloud_client as nextcloud
class Nextcloud : class Nextcloud :
__template__={"url":None,"token":None,"uid":None,"file":None}
def __init__(self,**_args): def __init__(self,**_args):
pass pass
self._delimiter = None self._delimiter = None

@ -24,7 +24,6 @@ class s3 :
""" """
@TODO: Implement a search function for a file given a bucket?? @TODO: Implement a search function for a file given a bucket??
""" """
__template__={"access_key":None,"secret_key":None,"bucket":None,"file":None,"region":None}
def __init__(self,**args) : def __init__(self,**args) :
""" """
This function will extract a file or set of files from s3 bucket provided This function will extract a file or set of files from s3 bucket provided

@ -0,0 +1,19 @@
"""
This file will be intended to handle duckdb database
"""
import duckdb
from transport.common import Reader,Writer
class Duck(Reader):
def __init__(self,**_args):
super().__init__(**_args)
self._path = None if 'path' not in _args else _args['path']
self._handler = duckdb.connect() if not self._path else duckdb.connect(self._path)
class DuckReader(Duck) :
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args) :
pass

@ -5,113 +5,42 @@ NOTE: Plugins are converted to a pipeline, so we apply a pipeline when reading o
- upon initialization we will load plugins - upon initialization we will load plugins
- on read/write we apply a pipeline (if passed as an argument) - on read/write we apply a pipeline (if passed as an argument)
""" """
from transport.plugins import PluginLoader from transport.plugins import Plugin, PluginLoader
import transport import transport
from transport import providers from transport import providers
from multiprocessing import Process, RLock from multiprocessing import Process
import time import time
import types
from . import registry
from datetime import datetime
import pandas as pd
import numpy as np
import os
import sys
import itertools
import json
import plugin_ix import plugin_ix
class BaseIO : class IO:
def __init__(self,**_args):
self._logger = _args['logger'] if 'logger' in _args else None
self._logTable = 'logs' if 'logTable' not in _args else _args['logTable']
def setLogger(self,_logger):
self._logger = _logger
def log (self,**_args):
if self._logger :
_date = str(datetime.now())
_data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args)
for key in _data :
if type(_data[key]) == list :
_data[key] = [_item.__name__ if type(_item).__name__== 'function' else _item for _item in _data[key]]
_data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key])
self._logger.write(pd.DataFrame([_data])) #,table=self._logTable)
class IO(BaseIO):
""" """
Base wrapper class for read/write and support for logs Base wrapper class for read/write and support for logs
""" """
def __init__(self,**_args): def __init__(self,**_args):
#
# We need to initialize the logger here ...
#
super().__init__(**_args)
_agent = _args['agent'] _agent = _args['agent']
plugins = _args['plugins'] if 'plugins' else None plugins = _args['plugins'] if 'plugins' in _args else None
# _logger = _args['logger'] if 'logger' in _args else None
# self._logger = _logger if not type(_agent) in [IReader,IWriter] else _agent._logger #transport.get.writer(label='logger') #if registry.has('logger') else None
# if not _logger and hasattr(_agent,'_logger') :
# self._logger = getattr(_agent,'_logger')
self._agent = _agent self._agent = _agent
_date = _date = str(datetime.now()) # self._ixloader = plugin_ix.Loader () #-- must indicate where the plugin registry file is
self._ixloader = plugin_ix.Loader (registry=plugin_ix.Registry(folder=transport.registry.REGISTRY_PATH)) self._ixloader = plugin_ix.Loader (registry=plugin_ix.Registry(folder=transport.registry.REGISTRY_PATH))
# self._logTable = 'logs' #'_'.join(['logs',_date[:10]+_date[11:19]]).replace(':','').replace('-','_')
if plugins : if plugins :
self.init_plugins(plugins) self.init_plugins(plugins)
# def setLogger(self,_logger):
# self._logger = _logger
# def log (self,**_args):
# if self._logger :
# _date = str(datetime.now())
# _data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args)
# for key in _data :
# if type(_data[key]) == list :
# _data[key] = [_item.__name__ if type(_item).__name__== 'function' else _item for _item in _data[key]]
# _data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key])
# self._logger.write(pd.DataFrame([_data])) #,table=self._logTable)
# def _init_plugins(self,_items):
# """
# This function will load pipelined functions as a plugin loader
# """
# registry.plugins.init()
# self._plugins = PluginLoader(registry=registry.plugins)
# [self._plugins.set(_name) for _name in _items]
# self.log(action='init-plugins',object=self.getClassName(self),input =[_name for _name in _items])
# # if 'path' in _args and 'names' in _args :
# # self._plugins = PluginLoader(**_args)
# # else:
# # self._plugins = PluginLoader(registry=registry.plugins)
# # [self._plugins.set(_pointer) for _pointer in _args]
# #
# # @TODO: We should have a way to log what plugins are loaded and ready to use
def meta (self,**_args): def meta (self,**_args):
if hasattr(self._agent,'meta') : if hasattr(self._agent,'meta') :
return self._agent.meta(**_args) return self._agent.meta(**_args)
return [] return []
def getClassName (self,_object):
return '.'.join([_object.__class__.__module__,_object.__class__.__name__])
def close(self): def close(self):
if hasattr(self._agent,'close') : if hasattr(self._agent,'close') :
self._agent.close() self._agent.close()
def apply(self): # def apply(self):
""" # """
applying pre/post conditions given a pipeline expression # applying pre/post conditions given a pipeline expression
""" # """
for _pointer in self._plugins : # for _pointer in self._plugins :
_data = _pointer(_data) # _data = _pointer(_data)
time.sleep(1)
def apply(self,_query): def apply(self,_query):
if hasattr(self._agent,'apply') : if hasattr(self._agent,'apply') :
return self._agent.apply(_query) return self._agent.apply(_query)
@ -133,173 +62,70 @@ class IReader(IO):
""" """
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
self._args = _args['args']if 'args' in _args else None
def _stream (self,_data ):
# self.log(action='streaming',object=self._agent._engine.name, input= type(_data).__name__)
_shape = []
for _segment in _data :
_shape += list(_segment.shape)
if self._plugins :
# yield self._plugins.apply(_segment,self.log)
yield self._ixloader.visitor(_data,self.log)
else:
yield _segment
_objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__])
_input = {'shape':_shape}
if hasattr(self._agent,'_table') :
_input['table'] = self._agent._table
self.log(action='streaming',object=_objectName, input= _input)
def read(self,**_args): def read(self,**_args):
if 'plugins' in _args : if 'plugins' in _args :
self.init_plugins(_args['plugins']) self.init_plugins(_args['plugins'])
if self._args : _data = self._agent.read(**_args)
_data = self._agent.read(**self._args) # if self._plugins and self._plugins.ratio() > 0 :
else: # _data = self._plugins.apply(_data)
_data = self._agent.read(**_args) #
# output data
_objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__])
if types.GeneratorType == type(_data):
return self._stream(_data)
# if self._plugins :
# return self._stream(_data)
# else:
# _count = 0
# for _segment in _data :
# _count += 1
# yield _segment
# self.log(action='streaming',object=_objectName, input= {'segments':_count})
# return _data
elif type(_data) == pd.DataFrame :
_shape = _data.shape #[0,0] if not _data.shape[] else list(_data.shape)
_input = {'shape':_shape}
if hasattr(self._agent,'_table') :
_input['table'] = self._agent._table
self.log(action='read',object=_objectName, input=_input)
_data = self._ixloader.visitor(_data)
return _data
#
# applying the the design pattern
_data = self._ixloader.visitor(_data)
return _data
class IWriter(IO): class IWriter(IO):
lock = RLock() def __init__(self,**_args): #_agent,pipeline=None):
def __init__(self,**_args): super().__init__(**_args) #_agent,pipeline)
super().__init__(**_args)
def write(self,_data,**_args): def write(self,_data,**_args):
# if 'plugins' in _args :
# self._init_plugins(_args['plugins'])
if 'plugins' in _args : if 'plugins' in _args :
self._init_plugins(_args['plugins']) self.init_plugins(_args['plugins'])
# if self._plugins and self._plugins.ratio() > 0 :
# _logs = []
# _data = self._plugins.apply(_data,_logs,self.log)
# [self.log(**_item) for _item in _logs] self._ixloader.visitor(_data)
try: self._agent.write(_data,**_args)
# IWriter.lock.acquire()
_data = self._ixloader.visitor(_data)
self._agent.write(_data,**_args)
finally:
# IWriter.lock.release()
pass
# #
# The ETL object in its simplest form is an aggregation of read/write objects # The ETL object in its simplest form is an aggregation of read/write objects
# @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process # @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process
class IETL(BaseIO) : class IETL(IReader) :
""" """
This class performs an ETL operation by ineriting a read and adding writes as pipeline functions This class performs an ETL operation by ineriting a read and adding writes as pipeline functions
""" """
def __init__(self,**_args): def __init__(self,**_args):
# _source = _args['source'] super().__init__(agent=transport.get.reader(**_args['source']),plugins=None)
# _plugins = _source['plugins'] if 'plugins' in _source else None if 'target' in _args:
self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
# # super().__init__(transport.get.reader(**_args['source']))
# super().__init__(agent=transport.get.reader(**_source),plugins=_plugins)
# # _logger =
# if 'target' in _args:
# self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
# else:
# self._targets = []
# self.jobs = []
# #
# # If the parent is already multiprocessing
# if 'token' in _source :
# self._logToken = _source['token']
# self._sourceArgs = _source['args'] if 'args' in _source else None
# self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
super().__init__()
self._source = _args['source']
self._targets= _args['target'] if _args['target'] == list else [_args['target']]
#
# ETL Initialization, we should provide some measure of context ...
#
# def run(self) :
# """
# We should apply the etl here, if we are in multiprocessing mode
# """
# return self.read()
def run(self,**_args):
# _data = super().read(**_args) if not self._sourceArgs else super().read(**self._sourceArgs)
# self._targets = [transport.get.writer(**_kwargs) for _kwargs in self._targets]
_reader = transport.get.reader(**self._source)
if hasattr(_reader,'_logger') :
self.setLogger(_reader._logger)
self.log(action='init-etl',input={'source':self._source,'target':self._targets})
_data = _reader.read(**self._source['args'])if 'args' in self._source else _reader.read()
_reader.close()
_writers = [transport.get.writer(**_kwargs) for _kwargs in self._targets]
_schema = [] if not getattr(_reader._agent,'_table') else _reader.meta()
if types.GeneratorType == type(_data):
_index = 0
for _segment in _data :
_index += 1
for _writer in _writers :
self.post(_segment,writer=_writer,index=_index,schema=_schema)
time.sleep(1)
else: else:
self._targets = []
self.jobs = []
#
# If the parent is already multiprocessing
self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
def read(self,**_args):
_data = super().read(**_args)
_schema = super().meta()
for _kwargs in self._targets :
if _schema :
_kwargs['schema'] = _schema
self.post(_data,**_kwargs)
for _writer in _writers :
self.post(_data,writer=_writer,schema=_schema)
# pass
return _data return _data
# return _data def run(self) :
return self.read()
def post (self,_data,**_args) : def post (self,_data,**_args) :
""" """
This function returns an instance of a process that will perform the write operation This function returns an instance of a process that will perform the write operation
:_args parameters associated with writer object :_args parameters associated with writer object
""" """
#writer = transport.get.writer(**_args) writer = transport.get.writer(**_args)
_input = {} if 'schema' in _args :
try: writer.write(_data,schema=_args['schema'])
_action = 'post' else:
_shape = dict(zip(['rows','columns'],_data.shape)) writer.write(_data)
_index = _args['index'] if 'index' in _args else 0 writer.close()
writer = _args['writer']
_schema= _args['schema']
#
# -- things to log
_input = {'shape':_shape,'segment':_index}
if hasattr(writer._agent,'_table'):
_input['table'] = writer._agent._table
for _item in _schema :
if _item['type'] in ['INTEGER','BIGINT','INT'] :
_column = _item['name']
_data[_column] = _data[_column].copy().fillna(0).astype(np.int64)
writer.write(_data,schema=_schema)
except Exception as e:
_action = 'post-error'
_input['error'] = str(e)
print ([e])
pass
self.log(action=_action,object=writer._agent.__module__, input= _input)

@ -19,7 +19,6 @@ class Couch:
@param doc user id involved @param doc user id involved
@param dbname database name (target) @param dbname database name (target)
""" """
__template__={"url":None,"doc":None,"dbname":None,"username":None,"password":None}
def __init__(self,**args): def __init__(self,**args):
url = args['url'] if 'url' in args else 'http://localhost:5984' url = args['url'] if 'url' in args else 'http://localhost:5984'
self._id = args['doc'] self._id = args['doc']

@ -25,7 +25,6 @@ class Mongo :
""" """
Basic mongodb functions are captured here Basic mongodb functions are captured here
""" """
__template__={"db":None,"collection":None,"host":None,"port":None,"username":None,"password":None}
def __init__(self,**args): def __init__(self,**args):
""" """
:dbname database name/identifier :dbname database name/identifier

@ -9,7 +9,7 @@ import numpy as np
import pandas as pd import pandas as pd
class Writer : class Writer :
lock = None lock = Lock()
_queue = {'default':queue.Queue()} _queue = {'default':queue.Queue()}
def __init__(self,**_args): def __init__(self,**_args):
self._cache = {} self._cache = {}

@ -12,7 +12,7 @@ class File :
""" """
self.path = params['path'] if 'path' in params else None self.path = params['path'] if 'path' in params else None
self.delimiter = params['delimiter'] if 'delimiter' in params else ',' self.delimiter = params['delimiter'] if 'delimiter' in params else ','
self._chunksize = None if 'chunksize' not in params else int(params['chunksize'])
def isready(self): def isready(self):
return os.path.exists(self.path) return os.path.exists(self.path)
def meta(self,**_args): def meta(self,**_args):
@ -26,15 +26,11 @@ class Reader (File):
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
def _stream(self,path) :
reader = pd.read_csv(path,sep=self.delimiter,chunksize=self._chunksize,low_memory=False)
for segment in reader :
yield segment
def read(self,**args): def read(self,**args):
_path = self.path if 'path' not in args else args['path'] _path = self.path if 'path' not in args else args['path']
_delimiter = self.delimiter if 'delimiter' not in args else args['delimiter'] _delimiter = self.delimiter if 'delimiter' not in args else args['delimiter']
return pd.read_csv(_path,delimiter=self.delimiter)
return pd.read_csv(_path,sep=self.delimiter) if not self._chunksize else self._stream(_path)
def stream(self,**args): def stream(self,**args):
raise Exception ("streaming needs to be implemented") raise Exception ("streaming needs to be implemented")
class Writer (File): class Writer (File):

@ -59,6 +59,9 @@ class PluginLoader :
pass pass
def load (self,**_args): def load (self,**_args):
"""
This function loads a plugin
"""
self._modules = {} self._modules = {}
self._names = [] self._names = []
path = _args ['path'] path = _args ['path']

@ -12,7 +12,6 @@ from io import StringIO
This class manages data from the registry and allows (read only) This class manages data from the registry and allows (read only)
@TODO: add property to the DATA attribute @TODO: add property to the DATA attribute
""" """
if 'HOME' in os.environ : if 'HOME' in os.environ :
REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport']) REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
else: else:
@ -26,6 +25,7 @@ if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ :
REGISTRY_FILE= 'transport-registry.json' REGISTRY_FILE= 'transport-registry.json'
DATA = {} DATA = {}
def isloaded (): def isloaded ():
return DATA not in [{},None] return DATA not in [{},None]
def exists (path=REGISTRY_PATH,_file=REGISTRY_FILE) : def exists (path=REGISTRY_PATH,_file=REGISTRY_FILE) :

@ -3,7 +3,7 @@ This namespace/package wrap the sql functionalities for a certain data-stores
- netezza, postgresql, mysql and sqlite - netezza, postgresql, mysql and sqlite
- mariadb, redshift (also included) - mariadb, redshift (also included)
""" """
from . import postgresql, mysql, netezza, sqlite3, sqlserver, duckdb from . import postgresql, mysql, netezza, sqlite, sqlserver, duckdb
# #
@ -11,7 +11,7 @@ from . import postgresql, mysql, netezza, sqlite3, sqlserver, duckdb
# #
mariadb = mysql mariadb = mysql
redshift = postgresql redshift = postgresql
# sqlite3 = sqlite sqlite3 = sqlite
# from transport import sql # from transport import sql

@ -67,7 +67,6 @@ class Base:
# else: # else:
return [] return []
def has(self,**_args): def has(self,**_args):
return self.meta(**_args) return self.meta(**_args)
def apply(self,sql): def apply(self,sql):
@ -78,10 +77,11 @@ class Base:
@TODO: Execution of stored procedures @TODO: Execution of stored procedures
""" """
if sql.strip().lower().startswith('select') or sql.strip().lower().startswith('with') or sql.strip().startswith('show'): if sql.strip().lower().startswith('select') or sql.strip().lower().startswith('with') or sql.strip().startswith('show'):
return pd.read_sql(sql,self._engine) return pd.read_sql(sql,self._engine)
else: else:
_handler = self._engine.connect() _handler = self._engine.connect()
_handler.execute(text(sql.strip())) _handler.execute(text(sql))
_handler.commit () _handler.commit ()
_handler.close() _handler.close()
return None return None
@ -141,7 +141,9 @@ class BaseWriter (SQLBase):
""" """
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
def write(self,_data,**_args): def write(self,_data,**_args):
if type(_data) == dict : if type(_data) == dict :
_df = pd.DataFrame([_data]) _df = pd.DataFrame([_data])
elif type(_data) == list : elif type(_data) == list :

@ -1,9 +1,7 @@
import sqlalchemy import sqlalchemy
import pandas as pd import pandas as pd
from transport.sql.common import Base, BaseReader, BaseWriter from transport.sql.common import Base, BaseReader, BaseWriter
from multiprocessing import RLock class SQLite (BaseReader):
class SQLite3 :
lock = RLock()
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
if 'path' in _args : if 'path' in _args :
@ -14,7 +12,7 @@ class SQLite3 :
path = self._database path = self._database
return f'sqlite:///{path}' # ensure this is the correct path for the sqlite file. return f'sqlite:///{path}' # ensure this is the correct path for the sqlite file.
class Reader(SQLite3,BaseReader): class Reader(SQLite,BaseReader):
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
# def read(self,**_args): # def read(self,**_args):
@ -22,12 +20,6 @@ class Reader(SQLite3,BaseReader):
# return pd.read_sql(sql,self._engine) # return pd.read_sql(sql,self._engine)
class Writer (SQLite3,BaseWriter): class Writer (SQLite,BaseWriter):
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
def write(self,_data,**_kwargs):
try:
SQLite3.lock.acquire()
super().write(_data,**_kwargs)
finally:
SQLite3.lock.release()

@ -7,7 +7,6 @@ from transport.sql.common import Base, BaseReader, BaseWriter
class MsSQLServer: class MsSQLServer:
def __init__(self,**_args) : def __init__(self,**_args) :
super().__init__(**_args) super().__init__(**_args)
pass pass

Loading…
Cancel
Save