refactor: etl,better reusability & streamlined and threaded #18

Merged
steve merged 3 commits from v2.0.4 into master 4 months ago

@ -44,12 +44,15 @@ import sys
import transport import transport
import time import time
from multiprocessing import Process from multiprocessing import Process
import typer
import os import os
import transport import transport
from transport import etl from transport import etl
# from transport import providers # from transport import providers
import typer
from typing_extensions import Annotated
from typing import Optional
import time
app = typer.Typer() app = typer.Typer()
@ -62,28 +65,33 @@ def wait(jobs):
time.sleep(1) time.sleep(1)
@app.command(name="apply") @app.command(name="apply")
def apply (path,index=None): def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
index:int = typer.Option(help="index of the item of interest, otherwise everything in the file will be processed")):
""" """
This function applies data transport from one source to one or several others This function applies data transport from one source to one or several others
:path path of the configuration file
:index index of the _item of interest (otherwise everything will be processed)
""" """
_proxy = lambda _object: _object.write(_object.read()) # _proxy = lambda _object: _object.write(_object.read())
if os.path.exists(path): if os.path.exists(path):
file = open(path) file = open(path)
_config = json.loads (file.read() ) _config = json.loads (file.read() )
file.close() file.close()
if index : if index :
_config = _config[ int(index)] _config = [_config[ int(index)]]
etl.instance(**_config) jobs = []
else: for _args in _config :
etl.instance(config=_config) pthread = etl.instance(**_args) #-- automatically starts the process
jobs.append(pthread)
#
# @TODO: Log the number of processes started and estimated time
while jobs :
jobs = [pthread for pthread in jobs if pthread.is_alive()]
time.sleep(1)
#
# @TODO: Log the job termination here ...
@app.command(name="providers") @app.command(name="providers")
def supported (format:str="table") : def supported (format:Annotated[str,typer.Argument(help="format of the output, supported formats are (list,table,json)")]="table") :
""" """
This function will print supported providers and their associated classifications This function will print supported providers/vendors and their associated classifications
""" """
_df = (transport.supported()) _df = (transport.supported())
if format in ['list','json'] : if format in ['list','json'] :
@ -94,9 +102,15 @@ def supported (format:str="table") :
@app.command() @app.command()
def version(): def version():
print (transport.version.__version__) """
This function will display version and license information
"""
print (transport.__app_name__,'version ',transport.__version__)
print (transport.__license__)
@app.command() @app.command()
def generate (path:str): def generate (path:Annotated[str,typer.Argument(help="path of the ETL configuration file template (name included)")]):
""" """
This function will generate a configuration template to give a sense of how to create one This function will generate a configuration template to give a sense of how to create one
""" """
@ -104,15 +118,12 @@ def generate (path:str):
{ {
"source":{"provider":"http","url":"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv"}, "source":{"provider":"http","url":"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv"},
"target": "target":
[{"provider":"file","path":"addresses.csv","delimiter":"csv"},{"provider":"sqlite","database":"sample.db3","table":"addresses"}] [{"provider":"files","path":"addresses.csv","delimiter":","},{"provider":"sqlite","database":"sample.db3","table":"addresses"}]
} }
] ]
file = open(path,'w') file = open(path,'w')
file.write(json.dumps(_config)) file.write(json.dumps(_config))
file.close() file.close()
@app.command()
def usage():
print (__doc__)
if __name__ == '__main__' : if __name__ == '__main__' :
app() app()
# # # #

@ -1,8 +1,8 @@
__app_name__ = 'data-transport'
__author__ = 'The Phi Technology' __author__ = 'The Phi Technology'
__version__= '2.0.2' __version__= '2.0.4'
__license__=""" __email__ = "info@the-phi.com"
__license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba Copyright 2010 - 2024, Steve L. Nyemba
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the Software), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the Software), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

@ -15,21 +15,21 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 1, "execution_count": 3,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
"name": "stderr", "name": "stderr",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"100%|██████████| 1/1 [00:00<00:00, 5440.08it/s]\n" "100%|██████████| 1/1 [00:00<00:00, 10106.76it/s]\n"
] ]
}, },
{ {
"name": "stdout", "name": "stdout",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"['data transport version ', '2.0.0']\n" "['data transport version ', '2.0.4']\n"
] ]
} }
], ],
@ -45,7 +45,7 @@
"PRIVATE_KEY = os.environ['BQ_KEY'] #-- location of the service key\n", "PRIVATE_KEY = os.environ['BQ_KEY'] #-- location of the service key\n",
"DATASET = 'demo'\n", "DATASET = 'demo'\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n", "_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"bqw = transport.factory.instance(provider=providers.BIGQUERY,dataset=DATASET,table='friends',context='write',private_key=PRIVATE_KEY)\n", "bqw = transport.get.writer(provider=providers.BIGQUERY,dataset=DATASET,table='friends',private_key=PRIVATE_KEY)\n",
"bqw.write(_data,if_exists='replace') #-- default is append\n", "bqw.write(_data,if_exists='replace') #-- default is append\n",
"print (['data transport version ', transport.__version__])\n" "print (['data transport version ', transport.__version__])\n"
] ]
@ -63,7 +63,8 @@
"\n", "\n",
"**NOTE**\n", "**NOTE**\n",
"\n", "\n",
"It is possible to use **transport.factory.instance** or **transport.instance** they are the same. It allows the maintainers to know that we used a factory design pattern." "By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
] ]
}, },
{ {
@ -93,7 +94,7 @@
"from transport import providers\n", "from transport import providers\n",
"import os\n", "import os\n",
"PRIVATE_KEY=os.environ['BQ_KEY']\n", "PRIVATE_KEY=os.environ['BQ_KEY']\n",
"pgr = transport.instance(provider=providers.BIGQUERY,dataset='demo',table='friends',private_key=PRIVATE_KEY)\n", "pgr = transport.get.reader(provider=providers.BIGQUERY,dataset='demo',table='friends',private_key=PRIVATE_KEY)\n",
"_df = pgr.read()\n", "_df = pgr.read()\n",
"_query = 'SELECT COUNT(*) _counts, AVG(age) from demo.friends'\n", "_query = 'SELECT COUNT(*) _counts, AVG(age) from demo.friends'\n",
"_sdf = pgr.read(sql=_query)\n", "_sdf = pgr.read(sql=_query)\n",
@ -106,35 +107,13 @@
"cell_type": "markdown", "cell_type": "markdown",
"metadata": {}, "metadata": {},
"source": [ "source": [
"The cell bellow show the content of an auth_file, in this case if the dataset/table in question is not to be shared then you can use auth_file with information associated with the parameters.\n", "An **auth-file** is a file that contains database parameters used to access the database. \n",
"\n", "For code in shared environments, we recommend \n",
"**NOTE**:\n",
"\n", "\n",
"The auth_file is intended to be **JSON** formatted" "1. Having the **auth-file** stored on disk \n",
] "2. and the location of the file is set to an environment variable.\n",
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'dataset': 'demo', 'table': 'friends'}"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"\n", "\n",
"{\n", "To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
" \n",
" \"dataset\":\"demo\",\"table\":\"friends\"\n",
"}"
] ]
}, },
{ {

@ -11,14 +11,14 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 1, "execution_count": 4,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
"name": "stdout", "name": "stdout",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"2.0.0\n" "2.0.4\n"
] ]
} }
], ],
@ -30,7 +30,7 @@
"from transport import providers\n", "from transport import providers\n",
"import pandas as pd\n", "import pandas as pd\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n", "_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"mgw = transport.factory.instance(provider=providers.MONGODB,db='demo',collection='friends',context='write')\n", "mgw = transport.get.writer(provider=providers.MONGODB,db='demo',collection='friends')\n",
"mgw.write(_data)\n", "mgw.write(_data)\n",
"print (transport.__version__)" "print (transport.__version__)"
] ]
@ -48,12 +48,13 @@
"\n", "\n",
"**NOTE**\n", "**NOTE**\n",
"\n", "\n",
"It is possible to use **transport.factory.instance** or **transport.instance** they are the same. It allows the maintainers to know that we used a factory design pattern." "By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 4, "execution_count": 2,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -73,7 +74,7 @@
"\n", "\n",
"import transport\n", "import transport\n",
"from transport import providers\n", "from transport import providers\n",
"mgr = transport.instance(provider=providers.MONGODB,db='foo',collection='friends')\n", "mgr = transport.get.reader(provider=providers.MONGODB,db='foo',collection='friends')\n",
"_df = mgr.read()\n", "_df = mgr.read()\n",
"PIPELINE = [{\"$group\":{\"_id\":0,\"_counts\":{\"$sum\":1}, \"_mean\":{\"$avg\":\"$age\"}}}]\n", "PIPELINE = [{\"$group\":{\"_id\":0,\"_counts\":{\"$sum\":1}, \"_mean\":{\"$avg\":\"$age\"}}}]\n",
"_sdf = mgr.read(aggregate='friends',pipeline=PIPELINE)\n", "_sdf = mgr.read(aggregate='friends',pipeline=PIPELINE)\n",
@ -86,41 +87,13 @@
"cell_type": "markdown", "cell_type": "markdown",
"metadata": {}, "metadata": {},
"source": [ "source": [
"The cell bellow show the content of an auth_file, in this case if the dataset/table in question is not to be shared then you can use auth_file with information associated with the parameters.\n", "An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"\n", "\n",
"**NOTE**:\n", "1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"\n", "\n",
"The auth_file is intended to be **JSON** formatted" "To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'host': 'klingon.io',\n",
" 'port': 27017,\n",
" 'username': 'me',\n",
" 'password': 'foobar',\n",
" 'db': 'foo',\n",
" 'collection': 'friends',\n",
" 'authSource': '<authdb>',\n",
" 'mechamism': '<SCRAM-SHA-256|MONGODB-CR|SCRAM-SHA-1>'}"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"{\n",
" \"host\":\"klingon.io\",\"port\":27017,\"username\":\"me\",\"password\":\"foobar\",\"db\":\"foo\",\"collection\":\"friends\",\n",
" \"authSource\":\"<authdb>\",\"mechamism\":\"<SCRAM-SHA-256|MONGODB-CR|SCRAM-SHA-1>\"\n",
"}"
] ]
}, },
{ {

@ -17,17 +17,9 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 1, "execution_count": null,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [],
{
"name": "stdout",
"output_type": "stream",
"text": [
"['data transport version ', '2.0.0']\n"
]
}
],
"source": [ "source": [
"#\n", "#\n",
"# Writing to Google Bigquery database\n", "# Writing to Google Bigquery database\n",
@ -41,7 +33,7 @@
"MSSQL_AUTH_FILE= os.sep.join([AUTH_FOLDER,'mssql.json'])\n", "MSSQL_AUTH_FILE= os.sep.join([AUTH_FOLDER,'mssql.json'])\n",
"\n", "\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n", "_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"msw = transport.factory.instance(provider=providers.MSSQL,table='friends',context='write',auth_file=MSSQL_AUTH_FILE)\n", "msw = transport.get.writer(provider=providers.MSSQL,table='friends',auth_file=MSSQL_AUTH_FILE)\n",
"msw.write(_data,if_exists='replace') #-- default is append\n", "msw.write(_data,if_exists='replace') #-- default is append\n",
"print (['data transport version ', transport.__version__])\n" "print (['data transport version ', transport.__version__])\n"
] ]
@ -59,30 +51,15 @@
"\n", "\n",
"**NOTE**\n", "**NOTE**\n",
"\n", "\n",
"It is possible to use **transport.factory.instance** or **transport.instance** they are the same. It allows the maintainers to know that we used a factory design pattern." "By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 5, "execution_count": null,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [],
{
"name": "stdout",
"output_type": "stream",
"text": [
" name age\n",
"0 James Bond 55\n",
"1 Steve Rogers 150\n",
"2 Steve Nyemba 44\n",
"\n",
"--------- STATISTICS ------------\n",
"\n",
" _counts \n",
"0 3 83\n"
]
}
],
"source": [ "source": [
"\n", "\n",
"import transport\n", "import transport\n",
@ -91,7 +68,7 @@
"AUTH_FOLDER = os.environ['DT_AUTH_FOLDER'] #-- location of the service key\n", "AUTH_FOLDER = os.environ['DT_AUTH_FOLDER'] #-- location of the service key\n",
"MSSQL_AUTH_FILE= os.sep.join([AUTH_FOLDER,'mssql.json'])\n", "MSSQL_AUTH_FILE= os.sep.join([AUTH_FOLDER,'mssql.json'])\n",
"\n", "\n",
"msr = transport.instance(provider=providers.MSSQL,table='friends',auth_file=MSSQL_AUTH_FILE)\n", "msr = transport.get.reader(provider=providers.MSSQL,table='friends',auth_file=MSSQL_AUTH_FILE)\n",
"_df = msr.read()\n", "_df = msr.read()\n",
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n", "_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
"_sdf = msr.read(sql=_query)\n", "_sdf = msr.read(sql=_query)\n",
@ -104,25 +81,31 @@
"cell_type": "markdown", "cell_type": "markdown",
"metadata": {}, "metadata": {},
"source": [ "source": [
"The cell bellow show the content of an auth_file, in this case if the dataset/table in question is not to be shared then you can use auth_file with information associated with the parameters.\n", "An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"\n", "\n",
"**NOTE**:\n", "1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"\n", "\n",
"The auth_file is intended to be **JSON** formatted" "To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 3, "execution_count": 1,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
"data": { "data": {
"text/plain": [ "text/plain": [
"{'dataset': 'demo', 'table': 'friends'}" "{'provider': 'sqlserver',\n",
" 'dataset': 'demo',\n",
" 'table': 'friends',\n",
" 'username': '<username>',\n",
" 'password': '<password>'}"
] ]
}, },
"execution_count": 3, "execution_count": 1,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@ -130,10 +113,17 @@
"source": [ "source": [
"\n", "\n",
"{\n", "{\n",
" \n", " \"provider\":\"sqlserver\",\n",
" \"dataset\":\"demo\",\"table\":\"friends\",\"username\":\"<username>\",\"password\":\"<password>\"\n", " \"dataset\":\"demo\",\"table\":\"friends\",\"username\":\"<username>\",\"password\":\"<password>\"\n",
"}" "}"
] ]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
} }
], ],
"metadata": { "metadata": {

@ -14,14 +14,14 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 8, "execution_count": 3,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
"name": "stdout", "name": "stdout",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"2.0.0\n" "2.0.4\n"
] ]
} }
], ],
@ -33,7 +33,7 @@
"from transport import providers\n", "from transport import providers\n",
"import pandas as pd\n", "import pandas as pd\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n", "_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"myw = transport.factory.instance(provider=providers.MYSQL,database='demo',table='friends',context='write',auth_file=\"/home/steve/auth-mysql.json\")\n", "myw = transport.get.writer(provider=providers.MYSQL,database='demo',table='friends',auth_file=\"/home/steve/auth-mysql.json\")\n",
"myw.write(_data,if_exists='replace') #-- default is append\n", "myw.write(_data,if_exists='replace') #-- default is append\n",
"print (transport.__version__)" "print (transport.__version__)"
] ]
@ -51,12 +51,13 @@
"\n", "\n",
"**NOTE**\n", "**NOTE**\n",
"\n", "\n",
"It is possible to use **transport.factory.instance** or **transport.instance** they are the same. It allows the maintainers to know that we used a factory design pattern." "By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 9, "execution_count": 4,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -68,8 +69,8 @@
"1 Steve Rogers 150\n", "1 Steve Rogers 150\n",
"2 Steve Nyemba 44\n", "2 Steve Nyemba 44\n",
"--------- STATISTICS ------------\n", "--------- STATISTICS ------------\n",
" _counts avg\n", " _counts AVG(age)\n",
"0 3 83.0\n" "0 3 83.0\n"
] ]
} }
], ],
@ -77,7 +78,7 @@
"\n", "\n",
"import transport\n", "import transport\n",
"from transport import providers\n", "from transport import providers\n",
"myr = transport.instance(provider=providers.POSTGRESQL,database='demo',table='friends',auth_file='/home/steve/auth-mysql.json')\n", "myr = transport.get.reader(provider=providers.MYSQL,database='demo',table='friends',auth_file='/home/steve/auth-mysql.json')\n",
"_df = myr.read()\n", "_df = myr.read()\n",
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n", "_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
"_sdf = myr.read(sql=_query)\n", "_sdf = myr.read(sql=_query)\n",
@ -90,16 +91,18 @@
"cell_type": "markdown", "cell_type": "markdown",
"metadata": {}, "metadata": {},
"source": [ "source": [
"The cell bellow show the content of an auth_file, in this case if the dataset/table in question is not to be shared then you can use auth_file with information associated with the parameters.\n", "An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"\n", "\n",
"**NOTE**:\n", "1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"\n", "\n",
"The auth_file is intended to be **JSON** formatted" "To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 1, "execution_count": 5,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -109,21 +112,29 @@
" 'port': 3306,\n", " 'port': 3306,\n",
" 'username': 'me',\n", " 'username': 'me',\n",
" 'password': 'foobar',\n", " 'password': 'foobar',\n",
" 'provider': 'mysql',\n",
" 'database': 'demo',\n", " 'database': 'demo',\n",
" 'table': 'friends'}" " 'table': 'friends'}"
] ]
}, },
"execution_count": 1, "execution_count": 5,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
], ],
"source": [ "source": [
"{\n", "{\n",
" \"host\":\"klingon.io\",\"port\":3306,\"username\":\"me\",\"password\":\"foobar\",\n", " \"host\":\"klingon.io\",\"port\":3306,\"username\":\"me\",\"password\":\"foobar\", \"provider\":\"mysql\",\n",
" \"database\":\"demo\",\"table\":\"friends\"\n", " \"database\":\"demo\",\"table\":\"friends\"\n",
"}" "}"
] ]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
} }
], ],
"metadata": { "metadata": {

@ -14,14 +14,14 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 8, "execution_count": 1,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
"name": "stdout", "name": "stdout",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"2.0.0\n" "2.0.4\n"
] ]
} }
], ],
@ -33,7 +33,7 @@
"from transport import providers\n", "from transport import providers\n",
"import pandas as pd\n", "import pandas as pd\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n", "_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"pgw = transport.factory.instance(provider=providers.POSTGRESQL,database='demo',table='friends',context='write')\n", "pgw = transport.get.writer(provider=providers.POSTGRESQL,database='demo',table='friends')\n",
"pgw.write(_data,if_exists='replace') #-- default is append\n", "pgw.write(_data,if_exists='replace') #-- default is append\n",
"print (transport.__version__)" "print (transport.__version__)"
] ]
@ -49,14 +49,16 @@
"- Basic read of the designated table (friends) created above\n", "- Basic read of the designated table (friends) created above\n",
"- Execute an aggregate SQL against the table\n", "- Execute an aggregate SQL against the table\n",
"\n", "\n",
"\n",
"**NOTE**\n", "**NOTE**\n",
"\n", "\n",
"It is possible to use **transport.factory.instance** or **transport.instance** they are the same. It allows the maintainers to know that we used a factory design pattern." "By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 6, "execution_count": 2,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -77,7 +79,7 @@
"\n", "\n",
"import transport\n", "import transport\n",
"from transport import providers\n", "from transport import providers\n",
"pgr = transport.instance(provider=providers.POSTGRESQL,database='demo',table='friends')\n", "pgr = transport.get.reader(provider=providers.POSTGRESQL,database='demo',table='friends')\n",
"_df = pgr.read()\n", "_df = pgr.read()\n",
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n", "_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
"_sdf = pgr.read(sql=_query)\n", "_sdf = pgr.read(sql=_query)\n",
@ -90,16 +92,18 @@
"cell_type": "markdown", "cell_type": "markdown",
"metadata": {}, "metadata": {},
"source": [ "source": [
"The cell bellow show the content of an auth_file, in this case if the dataset/table in question is not to be shared then you can use auth_file with information associated with the parameters.\n", "An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"\n", "\n",
"**NOTE**:\n", "1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"\n", "\n",
"The auth_file is intended to be **JSON** formatted" "To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 1, "execution_count": 4,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -109,18 +113,19 @@
" 'port': 5432,\n", " 'port': 5432,\n",
" 'username': 'me',\n", " 'username': 'me',\n",
" 'password': 'foobar',\n", " 'password': 'foobar',\n",
" 'provider': 'postgresql',\n",
" 'database': 'demo',\n", " 'database': 'demo',\n",
" 'table': 'friends'}" " 'table': 'friends'}"
] ]
}, },
"execution_count": 1, "execution_count": 4,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
], ],
"source": [ "source": [
"{\n", "{\n",
" \"host\":\"klingon.io\",\"port\":5432,\"username\":\"me\",\"password\":\"foobar\",\n", " \"host\":\"klingon.io\",\"port\":5432,\"username\":\"me\",\"password\":\"foobar\", \"provider\":\"postgresql\",\n",
" \"database\":\"demo\",\"table\":\"friends\"\n", " \"database\":\"demo\",\"table\":\"friends\"\n",
"}" "}"
] ]

@ -18,7 +18,7 @@
"name": "stdout", "name": "stdout",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"2.0.0\n" "2.0.4\n"
] ]
} }
], ],
@ -30,7 +30,7 @@
"from transport import providers\n", "from transport import providers\n",
"import pandas as pd\n", "import pandas as pd\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n", "_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"sqw = transport.factory.instance(provider=providers.SQLITE,database='/home/steve/demo.db3',table='friends',context='write')\n", "sqw = transport.get.writer(provider=providers.SQLITE,database='/home/steve/demo.db3',table='friends')\n",
"sqw.write(_data,if_exists='replace') #-- default is append\n", "sqw.write(_data,if_exists='replace') #-- default is append\n",
"print (transport.__version__)" "print (transport.__version__)"
] ]
@ -46,9 +46,11 @@
"- Basic read of the designated table (friends) created above\n", "- Basic read of the designated table (friends) created above\n",
"- Execute an aggregate SQL against the table\n", "- Execute an aggregate SQL against the table\n",
"\n", "\n",
"\n",
"**NOTE**\n", "**NOTE**\n",
"\n", "\n",
"It is possible to use **transport.factory.instance** or **transport.instance** they are the same. It allows the maintainers to know that we used a factory design pattern." "By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
] ]
}, },
{ {
@ -74,10 +76,10 @@
"\n", "\n",
"import transport\n", "import transport\n",
"from transport import providers\n", "from transport import providers\n",
"pgr = transport.instance(provider=providers.SQLITE,database='/home/steve/demo.db3',table='friends')\n", "sqr = transport.get.reader(provider=providers.SQLITE,database='/home/steve/demo.db3',table='friends')\n",
"_df = pgr.read()\n", "_df = sqr.read()\n",
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n", "_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
"_sdf = pgr.read(sql=_query)\n", "_sdf = sqr.read(sql=_query)\n",
"print (_df)\n", "print (_df)\n",
"print ('--------- STATISTICS ------------')\n", "print ('--------- STATISTICS ------------')\n",
"print (_sdf)" "print (_sdf)"
@ -87,11 +89,13 @@
"cell_type": "markdown", "cell_type": "markdown",
"metadata": {}, "metadata": {},
"source": [ "source": [
"The cell bellow show the content of an auth_file, in this case if the dataset/table in question is not to be shared then you can use auth_file with information associated with the parameters.\n", "An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"\n", "\n",
"**NOTE**:\n", "1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"\n", "\n",
"The auth_file is intended to be **JSON** formatted. This is an overkill for SQLite ;-)" "To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
] ]
}, },
{ {

@ -5,19 +5,16 @@ from setuptools import setup, find_packages
import os import os
import sys import sys
# from version import __version__,__author__ # from version import __version__,__author__
from info import __version__, __author__ from info import __version__, __author__,__app_name__,__license__
# __author__ = 'The Phi Technology'
# __version__= '1.8.0'
def read(fname): def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read() return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = { args = {
"name":"data-transport", "name":__app_name__,
"version":__version__, "version":__version__,
"author":__author__,"author_email":"info@the-phi.com", "author":__author__,"author_email":"info@the-phi.com",
"license":"MIT", "license":__license__,
# "packages":["transport","info","transport/sql"]}, # "packages":["transport","info","transport/sql"]},
"packages": find_packages(include=['info','transport', 'transport.*'])} "packages": find_packages(include=['info','transport', 'transport.*'])}

@ -22,8 +22,8 @@ from transport import sql, nosql, cloud, other
import pandas as pd import pandas as pd
import json import json
import os import os
from info import __version__,__author__ from info import __version__,__author__,__email__,__license__,__app_name__
from transport.iowrapper import IWriter, IReader from transport.iowrapper import IWriter, IReader, IETL
from transport.plugins import PluginLoader from transport.plugins import PluginLoader
from transport import providers from transport import providers
@ -32,26 +32,35 @@ def init():
global PROVIDERS global PROVIDERS
for _module in [cloud,sql,nosql,other] : for _module in [cloud,sql,nosql,other] :
for _provider_name in dir(_module) : for _provider_name in dir(_module) :
if _provider_name.startswith('__') : if _provider_name.startswith('__') or _provider_name == 'common':
continue continue
PROVIDERS[_provider_name] = {'module':getattr(_module,_provider_name),'type':_module.__name__} PROVIDERS[_provider_name] = {'module':getattr(_module,_provider_name),'type':_module.__name__}
def instance (**_args): def instance (**_args):
""" """
type: This function returns an object of to read or write from a supported database provider/vendor
read: true|false (default true) @provider provider
auth_file @context read/write (default is read)
@auth_file: Optional if the database information provided is in a file. Useful for not sharing passwords
kwargs These are arguments that are provider/vendor specific
""" """
global PROVIDERS global PROVIDERS
if 'auth_file' in _args: if 'auth_file' in _args:
if os.path.exists(_args['auth_file']) : if os.path.exists(_args['auth_file']) :
#
# @TODO: add encryption module and decryption to enable this to be secure
#
f = open(_args['auth_file']) f = open(_args['auth_file'])
_args = dict (_args,** json.loads(f.read()) ) #_args = dict (_args,** json.loads(f.read()) )
#
# we overrite file parameters with arguments passed
_args = dict (json.loads(f.read()),**_args )
f.close() f.close()
else: else:
filename = _args['auth_file'] filename = _args['auth_file']
raise Exception(f" {filename} was not found or is invalid") raise Exception(f" {filename} was not found or is invalid")
if _args['provider'] in PROVIDERS : if 'provider' in _args and _args['provider'] in PROVIDERS :
_info = PROVIDERS[_args['provider']] _info = PROVIDERS[_args['provider']]
_module = _info['module'] _module = _info['module']
if 'context' in _args : if 'context' in _args :
@ -62,22 +71,54 @@ def instance (**_args):
_agent = _pointer (**_args) _agent = _pointer (**_args)
# #
loader = None loader = None
if 'plugins' in _args :
_params = _args['plugins']
if 'path' in _params and 'names' in _params : #
loader = PluginLoader(**_params) # @TODO:
elif type(_params) == list: # define a logger object here that will used by the wrapper
loader = PluginLoader() # this would allow us to know what the data-transport is doing and where/how it fails
for _delegate in _params : #
loader.set(_delegate)
# if 'plugins' in _args :
# _params = _args['plugins']
# if 'path' in _params and 'names' in _params :
# loader = PluginLoader(**_params)
# elif type(_params) == list:
# loader = PluginLoader()
# for _delegate in _params :
# loader.set(_delegate)
loader = None if 'plugins' not in _args else _args['plugins']
return IReader(_agent,loader) if _context == 'read' else IWriter(_agent,loader) return IReader(_agent,loader) if _context == 'read' else IWriter(_agent,loader)
else: else:
#
# We can handle the case for an ETL object
#
raise Exception ("Missing or Unknown provider") raise Exception ("Missing or Unknown provider")
pass pass
class get :
"""
This class is just a wrapper to make the interface (API) more conversational and easy to understand
"""
@staticmethod
def reader (**_args):
_args['context'] = 'read'
return instance(**_args)
@staticmethod
def writer(**_args):
"""
This function is a wrapper that will return a writer to a database. It disambiguates the interface
"""
_args['context'] = 'write'
return instance(**_args)
@staticmethod
def etl (**_args):
if 'source' in _args and 'target' in _args :
return IETL(**_args)
else:
raise Exception ("Malformed input found, object must have both 'source' and 'target' attributes")
def supported (): def supported ():
_info = {} _info = {}
for _provider in PROVIDERS : for _provider in PROVIDERS :

@ -39,22 +39,22 @@ import os
from multiprocessing import Process from multiprocessing import Process
SYS_ARGS = {} # SYS_ARGS = {}
if len(sys.argv) > 1: # if len(sys.argv) > 1:
N = len(sys.argv) # N = len(sys.argv)
for i in range(1,N): # for i in range(1,N):
value = None # value = None
if sys.argv[i].startswith('--'): # if sys.argv[i].startswith('--'):
key = sys.argv[i][2:] #.replace('-','') # key = sys.argv[i][2:] #.replace('-','')
SYS_ARGS[key] = 1 # SYS_ARGS[key] = 1
if i + 1 < N: # if i + 1 < N:
value = sys.argv[i + 1] = sys.argv[i+1].strip() # value = sys.argv[i + 1] = sys.argv[i+1].strip()
if key and value and not value.startswith('--'): # if key and value and not value.startswith('--'):
SYS_ARGS[key] = value # SYS_ARGS[key] = value
i += 2 # i += 2
class Transporter(Process): class Transporter(Process):
""" """
The transporter (Jason Stathem) moves data from one persistant store to another The transporter (Jason Stathem) moves data from one persistant store to another
@ -74,81 +74,72 @@ class Transporter(Process):
# #
# Let's insure we can support multiple targets # Let's insure we can support multiple targets
self._target = [self._target] if type(self._target) != list else self._target self._target = [self._target] if type(self._target) != list else self._target
pass pass
def read(self,**_args): def run(self):
"""
This function _reader = transport.get.etl(source=self._source,target=self._target)
"""
_reader = transport.factory.instance(**self._source)
# #
# If arguments are provided then a query is to be executed (not just a table dump)
if 'cmd' in self._source or 'query' in self._source : if 'cmd' in self._source or 'query' in self._source :
_query = self._source['cmd'] if 'cmd' in self._source else self._source['query'] _query = self._source['cmd'] if 'cmd' in self._source else self._source['query']
return _reader.read(**_query) return _reader.read(**_query)
else: else:
return _reader.read() return _reader.read()
# return _reader.read() if 'query' not in self._source else _reader.read(**self._source['query'])
def _delegate_write(self,_data,**_args):
"""
This function will write a data-frame to a designated data-store, The function is built around a delegation design pattern
:data data-frame or object to be written
"""
if _data.shape[0] > 0 :
for _target in self._target :
if 'write' not in _target :
_target['context'] = 'write'
# _target['lock'] = True
else:
# _target['write']['lock'] = True
pass
_writer = transport.factory.instance(**_target)
_writer.write(_data,**_args)
if hasattr(_writer,'close') :
_writer.close()
def write(self,_df,**_args):
"""
"""
SEGMENT_COUNT = 6
MAX_ROWS = 1000000
# _df = self.read()
_segments = np.array_split(np.arange(_df.shape[0]),SEGMENT_COUNT) if _df.shape[0] > MAX_ROWS else np.array( [np.arange(_df.shape[0])])
# _index = 0
for _indexes in _segments :
_fwd_args = {} if not _args else _args
self._delegate_write(_df.iloc[_indexes],**_fwd_args)
time.sleep(1)
#
# @TODO: Perhaps consider writing up each segment in a thread/process (speeds things up?)
pass
def instance(**_args): # def _read(self,**_args):
_proxy = lambda _agent: _agent.write(_agent.read()) # """
if 'source' in _args and 'target' in _args : # This function
# """
_agent = Transporter(**_args) # _reader = transport.factory.instance(**self._source)
_proxy(_agent) # #
# # If arguments are provided then a query is to be executed (not just a table dump)
else: # if 'cmd' in self._source or 'query' in self._source :
_config = _args['config'] # _query = self._source['cmd'] if 'cmd' in self._source else self._source['query']
_items = [Transporter(**_item) for _item in _config ] # return _reader.read(**_query)
_MAX_JOBS = 5 # else:
_items = np.array_split(_items,_MAX_JOBS) # return _reader.read()
for _batch in _items : # # return _reader.read() if 'query' not in self._source else _reader.read(**self._source['query'])
jobs = []
for _item in _batch : # def _delegate_write(self,_data,**_args):
thread = Process(target=_proxy,args = (_item,)) # """
thread.start() # This function will write a data-frame to a designated data-store, The function is built around a delegation design pattern
jobs.append(thread) # :data data-frame or object to be written
while jobs : # """
jobs = [thread for thread in jobs if thread.is_alive()] # if _data.shape[0] > 0 :
time.sleep(1) # for _target in self._target :
# if 'write' not in _target :
# _target['context'] = 'write'
# # _target['lock'] = True
# else:
# # _target['write']['lock'] = True
# pass
# _writer = transport.factory.instance(**_target)
# _writer.write(_data,**_args)
# if hasattr(_writer,'close') :
# _writer.close()
# def write(self,_df,**_args):
# """
# """
# SEGMENT_COUNT = 6
# MAX_ROWS = 1000000
# # _df = self.read()
# _segments = np.array_split(np.arange(_df.shape[0]),SEGMENT_COUNT) if _df.shape[0] > MAX_ROWS else np.array( [np.arange(_df.shape[0])])
# # _index = 0
# for _indexes in _segments :
# _fwd_args = {} if not _args else _args
# self._delegate_write(_df.iloc[_indexes],**_fwd_args)
# time.sleep(1)
# #
# # @TODO: Perhaps consider writing up each segment in a thread/process (speeds things up?)
# pass
def instance(**_args):
pthread = Transporter (**_args)
pthread.start()
return pthread
pass pass
# class Post(Process): # class Post(Process):
# def __init__(self,**args): # def __init__(self,**args):

@ -1,14 +1,39 @@
""" """
This class is a wrapper around read/write classes of cloud,sql,nosql,other packages This class is a wrapper around read/write classes of cloud,sql,nosql,other packages
The wrapper allows for application of plugins as pre-post conditions The wrapper allows for application of plugins as pre-post conditions.
NOTE: Plugins are converted to a pipeline, so we apply a pipeline when reading or writing:
- upon initialization we will load plugins
- on read/write we apply a pipeline (if passed as an argument)
""" """
from transport.plugins import plugin, PluginLoader
import transport
from transport import providers
from multiprocessing import Process
import time
class IO: class IO:
""" """
Base wrapper class for read/write Base wrapper class for read/write and support for logs
""" """
def __init__(self,_agent,plugins): def __init__(self,_agent,plugins):
self._agent = _agent self._agent = _agent
self._plugins = plugins if plugins :
self._init_plugins(plugins)
else:
self._plugins = None
def _init_plugins(self,_args):
"""
This function will load pipelined functions as a plugin loader
"""
if 'path' in _args and 'names' in _args :
self._plugins = PluginLoader(**_args)
else:
self._plugins = PluginLoader()
[self._plugins.set(_pointer) for _pointer in _args]
#
# @TODO: We should have a way to log what plugins are loaded and ready to use
def meta (self,**_args): def meta (self,**_args):
if hasattr(self._agent,'meta') : if hasattr(self._agent,'meta') :
return self._agent.meta(**_args) return self._agent.meta(**_args)
@ -28,9 +53,14 @@ class IO:
return self._agent.apply(_query) return self._agent.apply(_query)
return None return None
class IReader(IO): class IReader(IO):
"""
This is a wrapper for read functionalities
"""
def __init__(self,_agent,pipeline=None): def __init__(self,_agent,pipeline=None):
super().__init__(_agent,pipeline) super().__init__(_agent,pipeline)
def read(self,**_args): def read(self,**_args):
if 'plugins' in _args :
self._init_plugins(_args['plugins'])
_data = self._agent.read(**_args) _data = self._agent.read(**_args)
if self._plugins and self._plugins.ratio() > 0 : if self._plugins and self._plugins.ratio() > 0 :
_data = self._plugins.apply(_data) _data = self._plugins.apply(_data)
@ -41,7 +71,43 @@ class IWriter(IO):
def __init__(self,_agent,pipeline=None): def __init__(self,_agent,pipeline=None):
super().__init__(_agent,pipeline) super().__init__(_agent,pipeline)
def write(self,_data,**_args): def write(self,_data,**_args):
if 'plugins' in _args :
self._init_plugins(_args['plugins'])
if self._plugins and self._plugins.ratio() > 0 : if self._plugins and self._plugins.ratio() > 0 :
_data = self._plugins.apply(_data) _data = self._plugins.apply(_data)
self._agent.write(_data,**_args) self._agent.write(_data,**_args)
#
# The ETL object in its simplest form is an aggregation of read/write objects
# @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process
class IETL(IReader) :
"""
This class performs an ETL operation by ineriting a read and adding writes as pipeline functions
"""
def __init__(self,**_args):
super().__init__(transport.get.reader(**_args['source']))
if 'target' in _args:
self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
else:
self._targets = []
self.jobs = []
#
# If the parent is already multiprocessing
self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
def read(self,**_args):
_data = super().read(**_args)
for _kwargs in self._targets :
self.post(_data,**_kwargs)
return _data
def post (self,_data,**_args) :
"""
This function returns an instance of a process that will perform the write operation
:_args parameters associated with writer object
"""
writer = transport.get.writer(**_args)
writer.write(_data)
writer.close()

@ -53,8 +53,8 @@ class Writer (File):
""" """
try: try:
_delim = self._delimiter if 'delimiter' not in _args else _args['delimiter'] _delim = self.delimiter if 'delimiter' not in _args else _args['delimiter']
_path = self._path if 'path' not in _args else _args['path'] _path = self.path if 'path' not in _args else _args['path']
_mode = self._mode if 'mode' not in _args else _args['mode'] _mode = self._mode if 'mode' not in _args else _args['mode']
info.to_csv(_path,index=False,sep=_delim) info.to_csv(_path,index=False,sep=_delim)
@ -62,6 +62,7 @@ class Writer (File):
except Exception as e: except Exception as e:
# #
# Not sure what should be done here ... # Not sure what should be done here ...
print (e)
pass pass
finally: finally:
# DiskWriter.THREAD_LOCK.release() # DiskWriter.THREAD_LOCK.release()

@ -25,9 +25,9 @@ class plugin :
self._name = _args['name'] self._name = _args['name']
self._about = _args['about'] self._about = _args['about']
self._mode = _args['mode'] if 'mode' in _args else 'rw' self._mode = _args['mode'] if 'mode' in _args else 'rw'
def __call__(self,pointer): def __call__(self,pointer,**kwargs):
def wrapper(_args): def wrapper(_args,**kwargs):
return pointer(_args) return pointer(_args,**kwargs)
# #
# @TODO: # @TODO:
# add attributes to the wrapper object # add attributes to the wrapper object
@ -55,6 +55,7 @@ class PluginLoader :
self._names = [] self._names = []
if path and os.path.exists(path) and _names: if path and os.path.exists(path) and _names:
for _name in self._names : for _name in self._names :
spec = importlib.util.spec_from_file_location('private', path) spec = importlib.util.spec_from_file_location('private', path)
module = importlib.util.module_from_spec(spec) module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module) #--loads it into sys.modules spec.loader.exec_module(module) #--loads it into sys.modules
@ -101,7 +102,7 @@ class PluginLoader :
return _name in self._modules return _name in self._modules
def ratio (self): def ratio (self):
""" """
how many modules loaded vs unloaded given the list of names This functiion determines how many modules loaded vs unloaded given the list of names
""" """
_n = len(self._names) _n = len(self._names)

Loading…
Cancel
Save