Compare commits

...

27 Commits

Author SHA1 Message Date
Steve L. Nyemba 492dc8f374 Merge pull request 'new provider console and bug fixes with applied commands' (#25) from v2.2.0 into master
1 month ago
Steve Nyemba 2df926da12 new provider console and bug fixes with applied commands
1 month ago
Steve L. Nyemba e848367378 Merge pull request 'bug fix, duckdb in-memory handling' (#24) from v2.2.0 into master
1 month ago
Steve Nyemba e9aab3b034 bug fix, duckdb in-memory handling
1 month ago
Steve L. Nyemba c872ba8cc2 Merge pull request 'v2.2.0 - Bug fixes with mongodb, console' (#23) from v2.2.0 into master
1 month ago
Steve Nyemba 34db729ad4 bug fixes: mongodb console
1 month ago
Steve Nyemba a7c72391e8 s3 notebook - code as documentation
3 months ago
Steve L. Nyemba baa8164f16 Merge pull request 'aws s3 notebook, brief example' (#22) from v2.2.0 into master
3 months ago
Steve Nyemba 955369fdd8 aws s3 notebook, brief example
3 months ago
Steve L. Nyemba 31556ebd32 Merge pull request 'v2.2.0 bug fix - AWS-S3' (#21) from v2.2.0 into master
3 months ago
Steve Nyemba 63666e95ce bug fix, TODO: figure out how to parse types
3 months ago
Steve Nyemba 9dba5daecd bug fix, TODO: figure out how to parse types
3 months ago
Steve Nyemba 40f9c3930a bug fixes, using boto3 instead of boto for s3 support
3 months ago
Steve L. Nyemba 1e7839198a Merge pull request 'v2.2.0 - shared environment support and duckdb support' (#20) from v2.2.0 into master
3 months ago
Steve Nyemba 3faee02fa2 documentation ...
3 months ago
Steve Nyemba 6f6fd48982 bug fixes: environment variable usage
4 months ago
Steve Nyemba 808378afdb bug fix: delegate (new feature)
4 months ago
Steve Nyemba 2edce85aed documentation duckdb support
4 months ago
Steve Nyemba 235a44be66 bug fix: registry and parameter handling
4 months ago
Steve Nyemba 037019c1d7 bug fix
4 months ago
Steve Nyemba c443c6c953 duckdb support
4 months ago
Steve Nyemba dde4767e37 new version
4 months ago
Steve L. Nyemba dce50a967e Merge pull request 'documentation ...' (#19) from v2.0.4 into master
4 months ago
Steve L. Nyemba 5ccb073865 Merge pull request 'refactor: etl,better reusability & streamlined and threaded' (#18) from v2.0.4 into master
4 months ago
Steve L. Nyemba 3081fb98e7 Merge pull request 'version 2.0 - Refactored, Plugins support' (#17) from v2.0 into master
6 months ago
Steve L. Nyemba 58959359ad Merge pull request 'bug fix: psycopg2 with numpy' (#14) from dev into master
8 months ago
Steve L. Nyemba 68b8f6af5f Merge pull request 'fixes 2024 pandas-gbq and sqlalchemy' (#10) from dev into master
8 months ago

@ -18,6 +18,20 @@ Within the virtual environment perform the following :
pip install git+https://github.com/lnyemba/data-transport.git pip install git+https://github.com/lnyemba/data-transport.git
## Features
- read/write from over a dozen databases
- run ETL jobs seamlessly
- scales and integrates into shared environments like apache zeppelin; jupyterhub; SageMaker; ...
## What's new
Unlike older versions 2.0 and under, we focus on collaborative environments like jupyter-x servers; apache zeppelin:
1. Simpler syntax to create reader or writer
2. auth-file registry that can be referenced using a label
3. duckdb support
## Learn More ## Learn More

@ -1,6 +1,6 @@
__app_name__ = 'data-transport' __app_name__ = 'data-transport'
__author__ = 'The Phi Technology' __author__ = 'The Phi Technology'
__version__= '2.0.4' __version__= '2.2.6'
__email__ = "info@the-phi.com" __email__ = "info@the-phi.com"
__license__=f""" __license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba Copyright 2010 - 2024, Steve L. Nyemba
@ -12,3 +12,10 @@ The above copyright notice and this permission notice shall be included in all c
THE SOFTWARE IS PROVIDED AS IS, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. THE SOFTWARE IS PROVIDED AS IS, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
""" """
__whatsnew__=f"""version {__version__}, focuses on collaborative environments like jupyter-base servers (apache zeppelin; jupyter notebook, jupyterlab, jupyterhub)
1. simpler syntax to create readers/writers
2. auth-file registry that can be referenced using a label
3. duckdb support
"""

@ -0,0 +1,149 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Writing data-transport plugins\n",
"\n",
"The data-transport plugins are designed to automate pre/post processing i.e\n",
"\n",
" - Read -> Post processing\n",
" - Write-> Pre processing\n",
" \n",
"In this example we will assume, data and write both pre/post processing to any supported infrastructure. We will equally show how to specify the plugins within a configuration file"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"#\n",
"# Writing to Google Bigquery database\n",
"#\n",
"import transport\n",
"from transport import providers\n",
"import pandas as pd\n",
"import os\n",
"import shutil\n",
"#\n",
"#\n",
"\n",
"DATABASE = '/home/steve/tmp/demo.db3'\n",
"if os.path.exists(DATABASE) :\n",
" os.remove(DATABASE)\n",
"#\n",
"# \n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"litew = transport.get.writer(provider=providers.SQLITE,database=DATABASE)\n",
"litew.write(_data,table='friends')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Reading from SQLite\n",
"\n",
"The cell below reads the data that has been written by the cell above and computes the average age from a plugin function we will write. \n",
"\n",
"- Basic read of the designated table (friends) created above\n",
"- Read with pipeline functions defined in code\n",
"\n",
"**NOTE**\n",
"\n",
"It is possible to use **transport.factory.instance** or **transport.instance** or **transport.get.<[reader|writer]>** they are the same. It allows the maintainers to know that we used a factory design pattern."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" name age\n",
"0 James Bond 55\n",
"1 Steve Rogers 150\n",
"2 Steve Nyemba 44\n",
"\n",
"\n",
" name age autoinc\n",
"0 James Bond 5.5 0\n",
"1 Steve Rogers 15.0 1\n",
"2 Steve Nyemba 4.4 2\n"
]
}
],
"source": [
"\n",
"import transport\n",
"from transport import providers\n",
"import os\n",
"import numpy as np\n",
"def _autoincrement (_data,**kwargs) :\n",
" \"\"\"\n",
" This function will add an autoincrement field to the table\n",
" \"\"\"\n",
" _data['autoinc'] = np.arange(_data.shape[0])\n",
" \n",
" return _data\n",
"def reduce(_data,**_args) :\n",
" \"\"\"\n",
" This function will reduce the age of the data frame\n",
" \"\"\"\n",
" _data.age /= 10\n",
" return _data\n",
"reader = transport.get.reader(provider=providers.SQLITE,database=DATABASE,table='friends')\n",
"#\n",
"# basic read of the data created in the first cell\n",
"_df = reader.read()\n",
"print (_df)\n",
"print ()\n",
"print()\n",
"#\n",
"# read of the data with pipeline function provided to alter the database\n",
"print (reader.read(pipeline=[_autoincrement,reduce]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The parameters for instianciating a transport object (reader or writer) can be found at [data-transport home](https://healthcareio.the-phi.com/data-transport)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

@ -0,0 +1,131 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Writing to AWS S3\n",
"\n",
"We have setup our demo environment with the label **aws** passed to reference our s3 access_key and secret_key and file (called friends.csv). In the cell below we will write the data to our aws s3 bucket named **com.phi.demo**"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2.2.1\n"
]
}
],
"source": [
"#\n",
"# Writing to mongodb database\n",
"#\n",
"import transport\n",
"from transport import providers\n",
"import pandas as pd\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"mgw = transport.get.writer(label='aws')\n",
"mgw.write(_data)\n",
"print (transport.__version__)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Reading from AWS S3\n",
"\n",
"The cell below reads the data that has been written by the cell above and computes the average age within a mongodb pipeline. The code in the background executes an aggregation using\n",
"\n",
"- Basic read of the designated file **friends.csv**\n",
"- Compute average age using standard pandas functions\n",
"\n",
"**NOTE**\n",
"\n",
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" bname age\n",
"0 James Bond 55\n",
"1 Steve Rogers 150\n",
"2 Steve Nyemba 44\n",
"--------- STATISTICS ------------\n",
"83.0\n"
]
}
],
"source": [
"\n",
"import transport\n",
"from transport import providers\n",
"import pandas as pd\n",
"\n",
"def cast(stream) :\n",
" print (stream)\n",
" return pd.DataFrame(str(stream))\n",
"mgr = transport.get.reader(label='aws')\n",
"_df = mgr.read()\n",
"print (_df)\n",
"print ('--------- STATISTICS ------------')\n",
"print (_df.age.mean())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"\n",
"1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"\n",
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

@ -18,8 +18,8 @@ args = {
# "packages":["transport","info","transport/sql"]}, # "packages":["transport","info","transport/sql"]},
"packages": find_packages(include=['info','transport', 'transport.*'])} "packages": find_packages(include=['info','transport', 'transport.*'])}
args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite'] args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite']
args["install_requires"] = ['pyncclient','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql'] args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql']
args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git"
args['scripts'] = ['bin/transport'] args['scripts'] = ['bin/transport']
# if sys.version_info[0] == 2 : # if sys.version_info[0] == 2 :

@ -22,7 +22,7 @@ from transport import sql, nosql, cloud, other
import pandas as pd import pandas as pd
import json import json
import os import os
from info import __version__,__author__,__email__,__license__,__app_name__ from info import __version__,__author__,__email__,__license__,__app_name__,__whatsnew__
from transport.iowrapper import IWriter, IReader, IETL from transport.iowrapper import IWriter, IReader, IETL
from transport.plugins import PluginLoader from transport.plugins import PluginLoader
from transport import providers from transport import providers
@ -38,7 +38,11 @@ def init():
if _provider_name.startswith('__') or _provider_name == 'common': if _provider_name.startswith('__') or _provider_name == 'common':
continue continue
PROVIDERS[_provider_name] = {'module':getattr(_module,_provider_name),'type':_module.__name__} PROVIDERS[_provider_name] = {'module':getattr(_module,_provider_name),'type':_module.__name__}
def _getauthfile (path) :
f = open(path)
_object = json.loads(f.read())
f.close()
return _object
def instance (**_args): def instance (**_args):
""" """
This function returns an object of to read or write from a supported database provider/vendor This function returns an object of to read or write from a supported database provider/vendor
@ -77,12 +81,15 @@ def instance (**_args):
if not registry.isloaded () : if not registry.isloaded () :
if ('path' in _args and registry.exists(_args['path'] )) or registry.exists(): if ('path' in _args and registry.exists(_args['path'] )) or registry.exists():
registry.load() if 'path' not in _args else registry.load(_args['path']) registry.load() if 'path' not in _args else registry.load(_args['path'])
_info = {}
if 'label' in _args and registry.isloaded(): if 'label' in _args and registry.isloaded():
_info = registry.get(_args['label']) _info = registry.get(_args['label'])
else:
if _info : _info = registry.get()
# if _info :
_args = dict(_args,**_info) #
# _args = dict(_args,**_info)
_args = dict(_info,**_args) #-- we can override the registry parameters with our own arguments
if 'provider' in _args and _args['provider'] in PROVIDERS : if 'provider' in _args and _args['provider'] in PROVIDERS :
_info = PROVIDERS[_args['provider']] _info = PROVIDERS[_args['provider']]
@ -127,7 +134,7 @@ class get :
""" """
@staticmethod @staticmethod
def reader (**_args): def reader (**_args):
if not _args : if not _args or ('provider' not in _args and 'label' not in _args):
_args['label'] = 'default' _args['label'] = 'default'
_args['context'] = 'read' _args['context'] = 'read'
return instance(**_args) return instance(**_args)
@ -136,7 +143,7 @@ class get :
""" """
This function is a wrapper that will return a writer to a database. It disambiguates the interface This function is a wrapper that will return a writer to a database. It disambiguates the interface
""" """
if not _args : if not _args or ('provider' not in _args and 'label' not in _args):
_args['label'] = 'default' _args['label'] = 'default'
_args['context'] = 'write' _args['context'] = 'write'
return instance(**_args) return instance(**_args)

@ -3,10 +3,13 @@ Data Transport - 1.0
Steve L. Nyemba, The Phi Technology LLC Steve L. Nyemba, The Phi Technology LLC
This file is a wrapper around s3 bucket provided by AWS for reading and writing content This file is a wrapper around s3 bucket provided by AWS for reading and writing content
TODO:
- Address limitations that will properly read csv if it is stored with content type text/csv
""" """
from datetime import datetime from datetime import datetime
import boto import boto3
from boto.s3.connection import S3Connection, OrdinaryCallingFormat # from boto.s3.connection import S3Connection, OrdinaryCallingFormat
import numpy as np import numpy as np
import botocore import botocore
from smart_open import smart_open from smart_open import smart_open
@ -14,6 +17,7 @@ import sys
import json import json
from io import StringIO from io import StringIO
import pandas as pd
import json import json
class s3 : class s3 :
@ -29,46 +33,37 @@ class s3 :
@param filter filename or filtering elements @param filter filename or filtering elements
""" """
try: try:
self.s3 = S3Connection(args['access_key'],args['secret_key'],calling_format=OrdinaryCallingFormat()) self._client = boto3.client('s3',aws_access_key_id=args['access_key'],aws_secret_access_key=args['secret_key'],region_name=args['region'])
self.bucket = self.s3.get_bucket(args['bucket'].strip(),validate=False) if 'bucket' in args else None self._bucket_name = args['bucket']
# self.path = args['path'] self._file_name = args['file']
self.filter = args['filter'] if 'filter' in args else None self._region = args['region']
self.filename = args['file'] if 'file' in args else None
self.bucket_name = args['bucket'] if 'bucket' in args else None
except Exception as e : except Exception as e :
self.s3 = None
self.bucket = None
print (e) print (e)
pass
def has(self,**_args):
_found = None
try:
if 'file' in _args and 'bucket' in _args:
_found = self.meta(**_args)
elif 'bucket' in _args and not 'file' in _args:
_found = self._client.list_objects(Bucket=_args['bucket'])
elif 'file' in _args and not 'bucket' in _args :
_found = self.meta(bucket=self._bucket_name,file = _args['file'])
except Exception as e:
_found = None
pass
return type(_found) == dict
def meta(self,**args): def meta(self,**args):
""" """
This function will return information either about the file in a given bucket
:name name of the bucket :name name of the bucket
""" """
info = self.list(**args) _bucket = self._bucket_name if 'bucket' not in args else args['bucket']
[item.open() for item in info] _file = self._file_name if 'file' not in args else args['file']
return [{"name":item.name,"size":item.size} for item in info] _data = self._client.get_object(Bucket=_bucket,Key=_file)
def list(self,**args): return _data['ResponseMetadata']
""" def close(self):
This function will list the content of a bucket, the bucket must be provided by the name self._client.close()
:name name of the bucket
"""
return list(self.s3.get_bucket(args['name']).list())
def buckets(self):
#
# This function will return all buckets, not sure why but it should be used cautiously
# based on why the s3 infrastructure is used
#
return [item.name for item in self.s3.get_all_buckets()]
# def buckets(self):
pass
# """
# This function is a wrapper around the bucket list of buckets for s3
# """
# return self.s3.get_all_buckets()
class Reader(s3) : class Reader(s3) :
""" """
@ -77,51 +72,66 @@ class Reader(s3) :
- stream content if file is Not None - stream content if file is Not None
@TODO: support read from all buckets, think about it @TODO: support read from all buckets, think about it
""" """
def __init__(self,**args) : def __init__(self,**_args) :
s3.__init__(self,**args) super().__init__(**_args)
def files(self):
r = [] def _stream(self,**_args):
try:
return [item.name for item in self.bucket if item.size > 0]
except Exception as e:
pass
return r
def stream(self,limit=-1):
""" """
At this point we should stream a file from a given bucket At this point we should stream a file from a given bucket
""" """
key = self.bucket.get_key(self.filename.strip()) _object = self._client.get_object(Bucket=_args['bucket'],Key=_args['file'])
if key is None : _stream = None
yield None try:
_stream = _object['Body'].read()
except Exception as e:
pass
if not _stream :
return None
if _object['ContentType'] in ['text/csv'] :
return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")))
else: else:
count = 0 return _stream
with smart_open(key) as remote_file:
for line in remote_file:
if count == limit and limit > 0 :
break
yield line
count += 1
def read(self,**args) : def read(self,**args) :
if self.filename is None :
# _name = self._file_name if 'file' not in args else args['file']
# returning the list of files because no one file was specified. _bucket = args['bucket'] if 'bucket' in args else self._bucket_name
return self.files() return self._stream(bucket=_bucket,file=_name)
else:
limit = args['size'] if 'size' in args else -1
return self.stream(limit)
class Writer(s3) : class Writer(s3) :
"""
def __init__(self,**args) : """
s3.__init__(self,**args) def __init__(self,**_args) :
def mkdir(self,name): super().__init__(**_args)
#
#
if not self.has(bucket=self._bucket_name) :
self.make_bucket(self._bucket_name)
def make_bucket(self,bucket_name):
""" """
This function will create a folder in a bucket This function will create a folder in a bucket,It is best that the bucket is organized as a namespace
:name name of the folder :name name of the folder
""" """
self.s3.put_object(Bucket=self.bucket_name,key=(name+'/'))
def write(self,content): self._client.create_bucket(Bucket=bucket_name,CreateBucketConfiguration={'LocationConstraint': self._region})
file = StringIO(content.decode("utf8")) def write(self,_data,**_args):
self.s3.upload_fileobj(file,self.bucket_name,self.filename) """
This function will write the data to the s3 bucket, files can be either csv, or json formatted files
"""
content = 'text/plain'
if type(_data) == pd.DataFrame :
_stream = _data.to_csv(index=False)
content = 'text/csv'
elif type(_data) == dict :
_stream = json.dumps(_data)
content = 'application/json'
else:
_stream = _data
file = StringIO(_stream)
bucket = self._bucket_name if 'bucket' not in _args else _args['bucket']
file_name = self._file_name if 'file' not in _args else _args['file']
self._client.put_object(Bucket=bucket, Key = file_name, Body=_stream,ContentType=content)
pass pass

@ -0,0 +1,19 @@
"""
This file will be intended to handle duckdb database
"""
import duckdb
from transport.common import Reader,Writer
class Duck(Reader):
def __init__(self,**_args):
super().__init__(**_args)
self._path = None if 'path' not in _args else _args['path']
self._handler = duckdb.connect() if not self._path else duckdb.connect(self._path)
class DuckReader(Duck) :
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args) :
pass

@ -52,6 +52,13 @@ class IO:
if hasattr(self._agent,'apply') : if hasattr(self._agent,'apply') :
return self._agent.apply(_query) return self._agent.apply(_query)
return None return None
def submit(self,_query):
return self.delegate('submit',_query)
def delegate(self,_name,_query):
if hasattr(self._agent,_name) :
pointer = getattr(self._agent,_name)
return pointer(_query)
return None
class IReader(IO): class IReader(IO):
""" """
This is a wrapper for read functionalities This is a wrapper for read functionalities

@ -33,6 +33,8 @@ class Mongo :
:password password for current user :password password for current user
""" """
self.host = 'localhost' if 'host' not in args else args['host'] self.host = 'localhost' if 'host' not in args else args['host']
if ':' not in self.host and 'port' in args :
self.host = ':'.join([self.host,str(args['port'])])
self.mechanism= 'SCRAM-SHA-256' if 'mechanism' not in args else args['mechanism'] self.mechanism= 'SCRAM-SHA-256' if 'mechanism' not in args else args['mechanism']
# authSource=(args['authSource'] if 'authSource' in args else self.dbname) # authSource=(args['authSource'] if 'authSource' in args else self.dbname)
self._lock = False if 'lock' not in args else args['lock'] self._lock = False if 'lock' not in args else args['lock']

@ -1 +1 @@
from . import files, http, rabbitmq, callback, files from . import files, http, rabbitmq, callback, files, console

@ -10,8 +10,11 @@ HTTP='http'
BIGQUERY ='bigquery' BIGQUERY ='bigquery'
FILE = 'file' FILE = 'file'
ETL = 'etl' ETL = 'etl'
SQLITE = 'sqlite' SQLITE = 'sqlite'
SQLITE3= 'sqlite3' SQLITE3= 'sqlite3'
DUCKDB = 'duckdb'
REDSHIFT = 'redshift' REDSHIFT = 'redshift'
NETEZZA = 'netezza' NETEZZA = 'netezza'
MYSQL = 'mysql' MYSQL = 'mysql'
@ -42,5 +45,6 @@ PGSQL = POSTGRESQL
AWS_S3 = 's3' AWS_S3 = 's3'
RABBIT = RABBITMQ RABBIT = RABBITMQ
# QLISTENER = 'qlistener' # QLISTENER = 'qlistener'

@ -10,6 +10,11 @@ This class manages data from the registry and allows (read only)
""" """
REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport']) REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
#
# This path can be overriden by an environment variable ...
#
if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ :
REGISTRY_PATH = os.environ['DATA_TRANSPORT_REGISTRY_PATH']
REGISTRY_FILE= 'transport-registry.json' REGISTRY_FILE= 'transport-registry.json'
DATA = {} DATA = {}
@ -37,7 +42,7 @@ def init (email,path=REGISTRY_PATH,override=False):
Initializing the registry and will raise an exception in the advent of an issue Initializing the registry and will raise an exception in the advent of an issue
""" """
p = '@' in email p = '@' in email
q = False if '.' not in email else email.split('.')[-1] in ['edu','com','io','ai'] q = False if '.' not in email else email.split('.')[-1] in ['edu','com','io','ai','org']
if p and q : if p and q :
_config = {"email":email,'version':__version__} _config = {"email":email,'version':__version__}
if not os.path.exists(path): if not os.path.exists(path):

@ -3,7 +3,7 @@ This namespace/package wrap the sql functionalities for a certain data-stores
- netezza, postgresql, mysql and sqlite - netezza, postgresql, mysql and sqlite
- mariadb, redshift (also included) - mariadb, redshift (also included)
""" """
from . import postgresql, mysql, netezza, sqlite, sqlserver from . import postgresql, mysql, netezza, sqlite, sqlserver, duckdb
# #

@ -3,6 +3,8 @@ This file encapsulates common operations associated with SQL databases via SQLAl
""" """
import sqlalchemy as sqa import sqlalchemy as sqa
from sqlalchemy import text
import pandas as pd import pandas as pd
class Base: class Base:
@ -56,7 +58,15 @@ class Base:
@TODO: Execution of stored procedures @TODO: Execution of stored procedures
""" """
return pd.read_sql(sql,self._engine) if sql.lower().startswith('select') or sql.lower().startswith('with') else None if sql.lower().startswith('select') or sql.lower().startswith('with') :
return pd.read_sql(sql,self._engine)
else:
_handler = self._engine.connect()
_handler.execute(text(sql))
_handler.commit ()
_handler.close()
return None
class SQLBase(Base): class SQLBase(Base):
def __init__(self,**_args): def __init__(self,**_args):

@ -0,0 +1,24 @@
"""
This module implements the handler for duckdb (in memory or not)
"""
from transport.sql.common import Base, BaseReader, BaseWriter
class Duck :
def __init__(self,**_args):
#
# duckdb with none as database will operate as an in-memory database
#
self.database = _args['database'] if 'database' in _args else ''
def get_provider(self):
return "duckdb"
def _get_uri(self,**_args):
return f"""duckdb:///{self.database}"""
class Reader(Duck,BaseReader) :
def __init__(self,**_args):
Duck.__init__(self,**_args)
BaseReader.__init__(self,**_args)
class Writer(Duck,BaseWriter):
def __init__(self,**_args):
Duck.__init__(self,**_args)
BaseWriter.__init__(self,**_args)
Loading…
Cancel
Save