@ -24,19 +24,28 @@ from multiprocessing import Process
import os
import os
import transport
import transport
from transport import etl
# from transport import etl
from transport.iowrapper import IETL
# from transport import providers
# from transport import providers
import typer
import typer
from typing_extensions import Annotated
from typing_extensions import Annotated
from typing import Optional
from typing import Optional
import time
import time
from termcolor import colored
from termcolor import colored
from enum import Enum
from rich import print
import plugin_ix as pix
app = typer.Typer()
app = typer.Typer()
app_e = typer.Typer() #-- handles etl (run, generate)
app_x = typer.Typer() #-- handles plugins (list,add, test)
app_i = typer.Typer() #-- handles information (version, license)
app_r = typer.Typer() #-- handles registry
REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
REGISTRY_FILE= 'transport-registry.json'
REGISTRY_FILE= 'transport-registry.json'
CHECK_MARK = ' '.join(['[',colored(u'\u2713', 'green'),']'])
CHECK_MARK = '[ [green]\u2713[/green] ]' #' '.join(['[',colored(u'\u2713', 'green'),']'])
TIMES_MARK= ' '.join(['[',colored(u'\u2717','red'),']'])
TIMES_MARK= '[ [red]\u2717[/red] ]' #' '.join(['[',colored(u'\u2717','red'),']'])
# @app.command()
# @app.command()
def help() :
def help() :
print (__doc__)
print (__doc__)
@ -44,10 +53,15 @@ def wait(jobs):
while jobs :
while jobs :
jobs = [thread for thread in jobs if thread.is_alive()]
jobs = [thread for thread in jobs if thread.is_alive()]
time.sleep(1)
time.sleep(1)
def wait (jobs):
while jobs :
jobs = [pthread for pthread in jobs if pthread.is_alive()]
@app.command(name="apply")
@app_e.command(name="run ")
def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed")):
index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"),
batch:int = typer.Option(default=5, help="The number of parallel processes to run at once")
):
"""
"""
This function applies data transport ETL feature to read data from one source to write it one or several others
This function applies data transport ETL feature to read data from one source to write it one or several others
"""
"""
@ -56,23 +70,34 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil
file = open(path)
file = open(path)
_config = json.loads (file.read() )
_config = json.loads (file.read() )
file.close()
file.close()
if index :
if index is not None :
_config = [_config[ int(index)]]
_config = [_config[ int(index)]]
jobs = []
jobs = []
for _args in _config :
for _args in _config :
pthread = etl.instance(**_args) #-- automatically starts the process
# pthread = etl.instance(**_args) #-- automatically starts the process
def bootup ():
_worker = IETL(**_args)
_worker.run()
pthread = Process(target=bootup)
pthread.start()
jobs.append(pthread)
jobs.append(pthread)
if len(jobs) == batch :
wait(jobs)
jobs = []
if jobs :
wait (jobs)
#
#
# @TODO: Log the number of processes started and estimated time
# @TODO: Log the number of processes started and estfrom transport impfrom transport imp imated time
while jobs :
# while jobs :
jobs = [pthread for pthread in jobs if pthread.is_alive()]
# jobs = [pthread for pthread in jobs if pthread.is_alive()]
time.sleep(1)
# time.sleep(1)
#
#
# @TODO: Log the job termination here ...
# @TODO: Log the job termination here ...
@app.command(name="providers ")
@app_i.command(name="supported ")
def supported (format:Annotated[str,typer.Argument(help="format of the output, supported formats are (list,table,json)")]="table") :
def supported (format:Annotated[str,typer.Argument(help="format of the output, supported formats are (list,table,json)")]="table") :
"""
"""
This function will print supported providers/vendors and their associated classification s
This function will print supported database technologie s
"""
"""
_df = (transport.supported())
_df = (transport.supported())
if format in ['list','json'] :
if format in ['list','json'] :
@ -80,17 +105,26 @@ def supported (format:Annotated[str,typer.Argument(help="format of the output, s
else:
else:
print (_df)
print (_df)
print ()
print ()
@app_i.command(name="version")
@app.command()
def version ():
def version():
"""
This function will return the version of the data-transport
"""
print()
print (f'[bold] {transport.__app_name__} ,[blue] {transport.__edition__} edition [/blue], version {transport.__version__}[/bold]')
print ()
@app_i.command(name="license")
def info():
"""
"""
This function will display version and license information
This function will display version and license information
"""
"""
print()
print (transport.__app_name__,'version ',transport.__version__)
print (f'[bold] {transport.__app_name__} ,{transport.__edition__}, version {transport.__version__}[/bold]')
print ()
print (transport.__license__)
print (transport.__license__)
@app.command()
@app_e .command()
def generate (path:Annotated[str,typer.Argument(help="path of the ETL configuration file template (name included)")]):
def generate (path:Annotated[str,typer.Argument(help="path of the ETL configuration file template (name included)")]):
"""
"""
This function will generate a configuration template to give a sense of how to create one
This function will generate a configuration template to give a sense of how to create one
@ -99,45 +133,45 @@ def generate (path:Annotated[str,typer.Argument(help="path of the ETL configurat
{
{
"source":{"provider":"http","url":"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv"},
"source":{"provider":"http","url":"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv"},
"target":
"target":
[{"provider":"files","path":"addresses.csv","delimiter":","},{"provider":"sqlite","database":"sample.db3","table":"addresses"}]
[{"provider":"files","path":"addresses.csv","delimiter":","},{"provider":"sqlite3 ","database":"sample.db3","table":"addresses"}]
}
}
]
]
file = open(path,'w')
file = open(path,'w')
file.write(json.dumps(_config))
file.write(json.dumps(_config))
file.close()
file.close()
print (f"""{CHECK_MARK} Successfully generated a template ETL file at {path}""" )
print (f"""{CHECK_MARK} Successfully generated a template ETL file at [bold] {path}[/bold] """ )
print ("""NOTE: Each line (source or target) is the content of an auth-file""")
print ("""NOTE: Each line (source or target) is the content of an auth-file""")
@app.command(name="ini t")
@app_r.command(name="rese t")
def initregistry (email:Annotated[str,typer.Argument(help="email")],
def initregistry (email:Annotated[str,typer.Argument(help="email")],
path:str=typer.Option(default=REGISTRY_PATH,help="path or location of the configuration file"),
path:str=typer.Option(default=REGISTRY_PATH,help="path or location of the configuration file"),
override:bool=typer.Option(default=False,help="override existing configuration or not")):
override:bool=typer.Option(default=False,help="override existing configuration or not")):
"""
"""
This functiion will initialize the registry and have both application and calling code loading the database parameters by a label
This functiion will initialize the data-transport registry and have both application and calling code loading the database parameters by a label
"""
"""
try:
try:
transport.registry.init(email=email, path=path, override=override)
transport.registry.init(email=email, path=path, override=override)
_msg = f"""{CHECK_MARK} Successfully wrote configuration to {path} from {email}"""
_msg = f"""{CHECK_MARK} Successfully wrote configuration to [bold] {path}[/bold] from [bold] {email}[/bold] """
except Exception as e:
except Exception as e:
_msg = f"{TIMES_MARK} {e}"
_msg = f"{TIMES_MARK} {e}"
print (_msg)
print (_msg)
print ()
print ()
@app.command(name="register ")
@app_r.command(name="add ")
def register (label:Annotated[str,typer.Argument(help="unique label that will be used to load the parameters of the database")],
def register (label:Annotated[str,typer.Argument(help="unique label that will be used to load the parameters of the database")],
auth_file:Annotated[str,typer.Argument(help="path of the auth_file")],
auth_file:Annotated[str,typer.Argument(help="path of the auth_file")],
default:bool=typer.Option(default=False,help="set the auth_file as default"),
default:bool=typer.Option(default=False,help="set the auth_file as default"),
path:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")):
path:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")):
"""
"""
This function will register an auth-file i.e database connection and assign it a label,
This function add a database label for a given auth-file. which allows access to the database using a label of your choice.
Learn more about auth-file at https://healthcareio.the-phi.com/data-transport
"""
"""
try:
try:
if transport.registry.exists(path) :
if transport.registry.exists(path) :
transport.registry.set(label=label,auth_file=auth_file, default=default, path=path)
transport.registry.set(label=label,auth_file=auth_file, default=default, path=path)
_msg = f"""{CHECK_MARK} Successfully added label "{label}" to data-transport registry"""
_msg = f"""{CHECK_MARK} Successfully added label [bold] "{label}"[/bold] to data-transport registry"""
else:
else:
_msg = f"""{TIMES_MARK} Registry is not initialized, please initialize the registry (check help)"""
_msg = f"""{TIMES_MARK} Registry is not initialized, please initialize the registry (check help)"""
except Exception as e:
except Exception as e:
@ -145,6 +179,68 @@ def register (label:Annotated[str,typer.Argument(help="unique label that will be
print (_msg)
print (_msg)
pass
pass
@app_x.command(name='add')
def register_plugs (
alias:Annotated[str,typer.Argument(help="unique function name within a file")],
path:Annotated[str,typer.Argument(help="path of the python file, that contains functions")],
folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry folder"),
):
"""
This function will register a file and the functions within we are interested in using
"""
if ',' in alias :
alias = [_name.strip() for _name in alias.split(',') if _name.strip() != '' ]
else:
alias = [alias.strip()]
_pregistry = pix.Registry(folder=folder,plugin_folder='plugins/code')
_log = _pregistry.set(path,alias)
# transport.registry.plugins.init()
# _log = transport.registry.plugins.add(alias,path)
_mark = TIMES_MARK if not _log else CHECK_MARK
_msg = f"""Could NOT add the [bold]{alias}[/bold]to the registry""" if not _log else f""" successfully added {alias}, {_log} functions registered"""
print (f"""{_mark} {_msg}""")
@app_x.command(name="list")
def registry_list (folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport configuration folder")):
"""
This function will list all the plugins (python functions/files) that are registered and can be reused
"""
_pregistry = pix.Registry(folder=folder)
_df = _pregistry.stats()
if _df.empty :
print (f"{TIMES_MARK} registry at {folder} is not ready")
else:
print (_df)
@app_x.command ("has")
def registry_has (alias:Annotated[str,typer.Argument(help="alias of a function function@file or file.function")],
folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")) :
_pregistry = pix.Registry(folder=folder)
if _pregistry.has(alias) :
_msg = f"{CHECK_MARK} {alias} was [bold] found [/bold] in registry "
else:
_msg = f"{TIMES_MARK} {alias} was [bold] NOT found [/bold] in registry "
print (_msg)
@app_x.command(name="test")
def registry_test (alias:Annotated[str,typer.Argument(help="alias of a function function@file or file.function")],
folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry folder")) :
_pregistry = pix.Registry(folder=folder)
"""
This function allows to test syntax for a plugin i.e in terms of alias@function
"""
# _item = transport.registry.plugins.has(key=key)
_pointer = _pregistry.get(alias) if _pregistry.has(alias) else None
if _pointer:
print (f"""{CHECK_MARK} successfully loaded [bold] {alias}[/bold] found in {folder}""")
else:
print (f"{TIMES_MARK} unable to load {alias}. Make sure it is registered")
app.add_typer(app_e,name='etl',help="This function will run etl or generate a template etl configuration file")
app.add_typer(app_r,name='registry',help='This function allows labeling database access information')
app.add_typer(app_i,name="info",help="This function will print either license or supported database technologies")
app.add_typer(app_x, name="plugins",help="This function enables add/list/test of plugins in the registry")
if __name__ == '__main__' :
if __name__ == '__main__' :
app()
app()