@ -24,19 +24,28 @@ from multiprocessing import Process
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					import os
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					import transport
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					from transport import etl
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					# from transport import etl
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					from transport.iowrapper import IETL
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					# from transport import providers
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					import typer
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					from typing_extensions import Annotated
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					from typing import Optional
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					import time
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					from termcolor import colored
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					from enum import Enum
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					from rich import print
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					import plugin_ix as pix
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					app = typer.Typer()
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					app_e = typer.Typer()   #-- handles etl (run, generate)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					app_x = typer.Typer()   #-- handles plugins (list,add, test)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					app_i = typer.Typer()   #-- handles information (version, license)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					app_r = typer.Typer()   #-- handles registry    
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					REGISTRY_FILE= 'transport-registry.json'
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					CHECK_MARK = ' '.join(['[',colored(u'\u2713', 'green'),']'])
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					TIMES_MARK= ' '.join(['[',colored(u'\u2717','red'),']'])
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					CHECK_MARK = '[ [green]\u2713[/green] ]' #'  '.join(['[',colored(u'\u2713', 'green'),']'])
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					TIMES_MARK= '[ [red]\u2717[/red] ]' #'  '.join(['[',colored(u'\u2717','red'),']'])
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					# @app.command()
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					def help() :     
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
						print (__doc__)
 
				
			 
			
		
	
	
		
			
				
					
						
						
						
							
								 
							 
						
					 
				
				 
				 
				
					@ -44,10 +53,15 @@ def wait(jobs):
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    while jobs :
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        jobs = [thread for thread in jobs if thread.is_alive()]
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        time.sleep(1)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					def wait (jobs):
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    while jobs :
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            jobs = [pthread for pthread in jobs if pthread.is_alive()]
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					@app.command(name="apply")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					@app_e.command(name="run ")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed")):
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"),
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        batch:int = typer.Option(default=5, help="The number of parallel processes to run at once")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        ):
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    This function applies data transport ETL feature to read data from one source to write it one or several others
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
	
		
			
				
					
						
						
						
							
								 
							 
						
					 
				
				 
				 
				
					@ -56,23 +70,34 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        file = open(path)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        _config = json.loads (file.read() )
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        file.close()
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        if index :
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        if index is not None :             
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            _config = [_config[ int(index)]]
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        jobs = []             
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        jobs = []          
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        for _args in _config :
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            pthread = etl.instance(**_args) #-- automatically starts the process
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            # pthread = etl.instance(**_args) #-- automatically starts the process
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            def bootup ():
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					                _worker = IETL(**_args)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					                _worker.run()
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            pthread = Process(target=bootup)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            pthread.start()
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            jobs.append(pthread)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            if len(jobs) == batch :
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					                wait(jobs)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					                jobs = []
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        if jobs :
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            wait (jobs)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        #
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        # @TODO: Log the number of processes started and estimated time
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        while jobs :
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					             jobs = [pthread for pthread in jobs if pthread.is_alive()]
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					             time.sleep(1)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        # @TODO: Log the number of processes started and estfrom transport impfrom transport imp imated time
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        #  while jobs :
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        #       jobs = [pthread for pthread in jobs if pthread.is_alive()]
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        #       time.sleep(1)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        #
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        # @TODO: Log the job termination here ...
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					@app.command(name="providers ")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					@app_i.command(name="supported ")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					def supported (format:Annotated[str,typer.Argument(help="format of the output, supported formats are (list,table,json)")]="table") :
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    This function will print supported providers/vendors and their associated classification s
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    This function will print supported database technologie s
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    _df =  (transport.supported())
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    if format in ['list','json'] :
 
				
			 
			
		
	
	
		
			
				
					
						
						
						
							
								 
							 
						
					 
				
				 
				 
				
					@ -80,17 +105,26 @@ def supported (format:Annotated[str,typer.Argument(help="format of the output, s
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    else:
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					         print (_df)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    print ()
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					@app.command()
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					def version():
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					@app_i.command(name="version")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					def version ():
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    This function will return the version of the data-transport
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    print()
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    print (f'[bold] {transport.__app_name__} ,[blue] {transport.__edition__} edition [/blue], version {transport.__version__}[/bold]')
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    print ()
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					 
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					@app_i.command(name="license")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					def info():
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    This function will display version and license information
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    print (transport.__app_name__,'version ',transport.__version__)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    print()
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    print (f'[bold] {transport.__app_name__} ,{transport.__edition__}, version {transport.__version__}[/bold]')
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    print ()
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    print (transport.__license__)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					@app.command()
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					@app_e .command()
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					def generate (path:Annotated[str,typer.Argument(help="path of the ETL configuration file template (name included)")]):
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    This function will generate a configuration template to give a sense of how to create one
 
				
			 
			
		
	
	
		
			
				
					
						
						
						
							
								 
							 
						
					 
				
				 
				 
				
					@ -99,45 +133,45 @@ def generate (path:Annotated[str,typer.Argument(help="path of the ETL configurat
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            {
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					                "source":{"provider":"http","url":"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv"},
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					                "target":
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            [{"provider":"files","path":"addresses.csv","delimiter":","},{"provider":"sqlite","database":"sample.db3","table":"addresses"}]
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            [{"provider":"files","path":"addresses.csv","delimiter":","},{"provider":"sqlite3 ","database":"sample.db3","table":"addresses"}]
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            }
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            ]
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    file = open(path,'w')
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    file.write(json.dumps(_config))
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    file.close()
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    print (f"""{CHECK_MARK} Successfully generated a template ETL file at {path}""" )
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    print (f"""{CHECK_MARK} Successfully generated a template ETL file at [bold] {path}[/bold] """ )
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    print ("""NOTE: Each line (source or target) is the content of an auth-file""")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					@app.command(name="ini t")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					@app_r.command(name="rese t")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					def initregistry (email:Annotated[str,typer.Argument(help="email")],
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					                  path:str=typer.Option(default=REGISTRY_PATH,help="path or location of the configuration file"), 
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					                  override:bool=typer.Option(default=False,help="override existing configuration or not")):
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    This functiion will initialize the registry and have both application and calling code loading the database parameters by a label
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    This functiion will initialize the data-transport  registry and have both application and calling code loading the database parameters by a label
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    try:
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        transport.registry.init(email=email, path=path, override=override)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        _msg = f"""{CHECK_MARK} Successfully wrote configuration to {path} from {email}"""
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        _msg = f"""{CHECK_MARK} Successfully wrote configuration to [bold] {path}[/bold]  from [bold] {email}[/bold] """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    except Exception as e:
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        _msg = f"{TIMES_MARK} {e}"
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    print (_msg)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    print ()
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					@app.command(name="register ")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					@app_r.command(name="add ")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					def register (label:Annotated[str,typer.Argument(help="unique label that will be used to load the parameters of the database")],
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					              auth_file:Annotated[str,typer.Argument(help="path of the auth_file")],
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					              default:bool=typer.Option(default=False,help="set the auth_file as default"),
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					              path:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")):
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    This function will register an auth-file i.e database connection and assign it a label,  
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    Learn more about auth-file at https://healthcareio.the-phi.com/data-transport 
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    This function add  a database label for a given auth-file. which allows access to the database using a label of your choice. 
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    try:
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        if transport.registry.exists(path) :
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            transport.registry.set(label=label,auth_file=auth_file, default=default, path=path)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            _msg = f"""{CHECK_MARK} Successfully added label "{label}" to data-transport registry"""
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            _msg = f"""{CHECK_MARK} Successfully added label [bold] "{label}"[/bold]  to data-transport registry"""
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        else:
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					            _msg = f"""{TIMES_MARK} Registry is not initialized, please initialize the registry (check help)"""
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    except Exception as e:
 
				
			 
			
		
	
	
		
			
				
					
						
						
						
							
								 
							 
						
					 
				
				 
				 
				
					@ -145,6 +179,68 @@ def register (label:Annotated[str,typer.Argument(help="unique label that will be
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    print (_msg)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    pass
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					@app_x.command(name='add') 
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					def register_plugs (
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    alias:Annotated[str,typer.Argument(help="unique function name within a file")],
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    path:Annotated[str,typer.Argument(help="path of the python file, that contains functions")],
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry folder"),
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    ):
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    This function will register a file and the functions within we are interested in using
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    if ',' in alias :
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        alias = [_name.strip() for _name in alias.split(',') if _name.strip() != '' ] 
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    else:
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        alias = [alias.strip()]
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    _pregistry  = pix.Registry(folder=folder,plugin_folder='plugins/code')
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    _log = _pregistry.set(path,alias)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    # transport.registry.plugins.init()
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    # _log = transport.registry.plugins.add(alias,path)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    _mark = TIMES_MARK if not _log else CHECK_MARK
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    _msg  = f"""Could NOT add the [bold]{alias}[/bold]to the registry""" if not _log else f""" successfully added {alias}, {_log} functions registered"""
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    print (f"""{_mark} {_msg}""")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					@app_x.command(name="list") 
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					def registry_list (folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport configuration folder")):
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    This function will list all the plugins (python functions/files) that are registered and can be reused
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    _pregistry  = pix.Registry(folder=folder)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    _df = _pregistry.stats()
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    if _df.empty :
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        print (f"{TIMES_MARK} registry at {folder} is not ready")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    else:
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        print (_df)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					@app_x.command ("has")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					def registry_has (alias:Annotated[str,typer.Argument(help="alias of a function function@file or file.function")],
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					                  folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")) :
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    _pregistry  = pix.Registry(folder=folder)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    if _pregistry.has(alias) :
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        _msg = f"{CHECK_MARK} {alias} was [bold] found [/bold] in registry "
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    else:
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        _msg = f"{TIMES_MARK} {alias} was [bold] NOT found [/bold] in registry "
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    print (_msg)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					@app_x.command(name="test") 
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					def registry_test (alias:Annotated[str,typer.Argument(help="alias of a function function@file or file.function")],
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					                  folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry folder")) :
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    _pregistry  = pix.Registry(folder=folder)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    This function allows to test syntax for a plugin i.e in terms of alias@function
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    """
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    # _item = transport.registry.plugins.has(key=key)
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    _pointer = _pregistry.get(alias) if _pregistry.has(alias) else None
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    if _pointer:
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        print (f"""{CHECK_MARK} successfully loaded [bold] {alias}[/bold] found in {folder}""")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					    else:
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					        print (f"{TIMES_MARK} unable to load {alias}. Make sure it is registered")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					app.add_typer(app_e,name='etl',help="This function will run etl or generate a template etl configuration file")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					app.add_typer(app_r,name='registry',help='This function allows labeling database access information')
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					app.add_typer(app_i,name="info",help="This function will print either license or supported database technologies")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					app.add_typer(app_x, name="plugins",help="This function enables add/list/test of plugins in the registry")
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					if __name__ == '__main__' :
 
				
			 
			
		
	
		
			
				
					 
					 
				
				 
				 
				
					     app()