You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			162 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			162 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
"""
 | 
						|
The functions within are designed to load external files and apply functions against the data
 | 
						|
The plugins are applied as 
 | 
						|
    - post-processing if we are reading data 
 | 
						|
    - and pre-processing if we are writing data
 | 
						|
 | 
						|
The plugin will use a decorator to identify meaningful functions
 | 
						|
@TODO: This should work in tandem with loggin (otherwise we don't have visibility into what is going on)
 | 
						|
"""
 | 
						|
import importlib as IL
 | 
						|
import importlib.util
 | 
						|
import sys
 | 
						|
import os
 | 
						|
import pandas as pd
 | 
						|
import time
 | 
						|
 | 
						|
class Plugin :
 | 
						|
    """
 | 
						|
    Implementing function decorator for data-transport plugins (post-pre)-processing
 | 
						|
    """
 | 
						|
    def __init__(self,**_args):
 | 
						|
        """
 | 
						|
        :name   name of the plugin
 | 
						|
        :mode   restrict to reader/writer
 | 
						|
        :about  tell what the function is about    
 | 
						|
        """
 | 
						|
        self._name = _args['name'] if 'name' in _args else None
 | 
						|
        self._version = _args['version'] if 'version' in _args else '0.1'
 | 
						|
        self._doc = _args['doc'] if 'doc' in _args else "N/A"
 | 
						|
        self._mode = _args['mode'] if 'mode' in _args else 'rw'
 | 
						|
    def __call__(self,pointer,**kwargs):
 | 
						|
        def wrapper(_args,**kwargs):
 | 
						|
            return pointer(_args,**kwargs)
 | 
						|
        #
 | 
						|
        # @TODO:
 | 
						|
        # add attributes to the wrapper object
 | 
						|
        #
 | 
						|
        self._name = pointer.__name__ if not self._name else self._name
 | 
						|
        setattr(wrapper,'transport',True)
 | 
						|
        setattr(wrapper,'name',self._name)
 | 
						|
        setattr(wrapper,'version',self._version)
 | 
						|
        setattr(wrapper,'doc',self._doc)
 | 
						|
        return wrapper
 | 
						|
 | 
						|
class PluginLoader :
 | 
						|
    """
 | 
						|
    This class is intended to load a plugin and make it available and assess the quality of the developed plugin
 | 
						|
    """
 | 
						|
   
 | 
						|
    def __init__(self,**_args):
 | 
						|
        """
 | 
						|
        """
 | 
						|
        # _names = _args['names'] if 'names' in _args else None
 | 
						|
        # path = _args['path'] if 'path' in _args else None
 | 
						|
        # self._names = _names if type(_names) == list else [_names]
 | 
						|
        self._modules = {}
 | 
						|
        self._names = []
 | 
						|
        self._registry = _args['registry']
 | 
						|
 | 
						|
        pass
 | 
						|
    def load (self,**_args):
 | 
						|
        """
 | 
						|
        This function loads a plugin
 | 
						|
        """
 | 
						|
        self._modules = {}
 | 
						|
        self._names = []
 | 
						|
        path = _args ['path']
 | 
						|
        if os.path.exists(path) :
 | 
						|
            _alias = path.split(os.sep)[-1]
 | 
						|
            spec = importlib.util.spec_from_file_location(_alias, path)
 | 
						|
            module = importlib.util.module_from_spec(spec)
 | 
						|
            spec.loader.exec_module(module) #--loads it into sys.modules
 | 
						|
            for _name in dir(module) :
 | 
						|
                if self.isplugin(module,_name) :
 | 
						|
                    self._module[_name] = getattr(module,_name)
 | 
						|
                    # self._names [_name]
 | 
						|
    def format (self,**_args):
 | 
						|
        uri = _args['alias'],_args['name']
 | 
						|
    # def set(self,_pointer) :
 | 
						|
    def set(self,_key) :
 | 
						|
        """
 | 
						|
        This function will set a pointer to the list of modules to be called
 | 
						|
        This should be used within the context of using the framework as a library
 | 
						|
        """
 | 
						|
        if type(_key).__name__ == 'function':
 | 
						|
            #
 | 
						|
            # The pointer is in the code provided by the user and loaded in memory
 | 
						|
            #
 | 
						|
            _pointer = _key
 | 
						|
            _key = 'inline@'+_key.__name__
 | 
						|
            # self._names.append(_key.__name__)
 | 
						|
        else:
 | 
						|
            _pointer = self._registry.get(key=_key)
 | 
						|
 | 
						|
        if _pointer  :
 | 
						|
            self._modules[_key] = _pointer
 | 
						|
            self._names.append(_key)
 | 
						|
        
 | 
						|
    def isplugin(self,module,name):
 | 
						|
        """
 | 
						|
        This function determines if a module is a recognized plugin
 | 
						|
        :module     module object loaded from importlib
 | 
						|
        :name       name of the functiion of interest
 | 
						|
        """
 | 
						|
        
 | 
						|
        p = type(getattr(module,name)).__name__ =='function'
 | 
						|
        q = hasattr(getattr(module,name),'transport')
 | 
						|
        #
 | 
						|
        # @TODO: add a generated key, and more indepth validation
 | 
						|
        return p and q
 | 
						|
    def has(self,_name):
 | 
						|
        """
 | 
						|
        This will determine if the module name is loaded or not
 | 
						|
        """
 | 
						|
        return _name in self._modules 
 | 
						|
    def ratio (self):
 | 
						|
        """
 | 
						|
        This functiion determines how many modules loaded vs unloaded given the list of names
 | 
						|
        """
 | 
						|
 | 
						|
        _n = len(self._names)
 | 
						|
        return len(set(self._modules.keys()) & set (self._names)) / _n
 | 
						|
    def apply(self,_data,_logger=[]):
 | 
						|
        _input= {}
 | 
						|
        
 | 
						|
        for _name in self._modules :
 | 
						|
            try:
 | 
						|
                _input = {'action':'plugin','object':_name,'input':{'status':'PASS'}}
 | 
						|
                _pointer = self._modules[_name]
 | 
						|
                if type(_data) == list :
 | 
						|
                    _data = pd.DataFrame(_data)
 | 
						|
                _brow,_bcol = list(_data.shape) 
 | 
						|
                
 | 
						|
                #
 | 
						|
                # @TODO: add exception handling
 | 
						|
                _data = _pointer(_data)
 | 
						|
                
 | 
						|
                _input['input']['shape'] = {'rows-dropped':_brow - _data.shape[0]}
 | 
						|
            except Exception as e:
 | 
						|
                _input['input']['status'] = 'FAILED'
 | 
						|
                print (e)
 | 
						|
            time.sleep(1)
 | 
						|
            if _logger:
 | 
						|
                try:
 | 
						|
                    _logger(**_input)
 | 
						|
                except Exception as e:
 | 
						|
                    pass    
 | 
						|
        return _data
 | 
						|
    # def apply(self,_data,_name):
 | 
						|
    #     """
 | 
						|
    #     This function applies an external module function against the data.
 | 
						|
    #     The responsibility is on the plugin to properly return data, thus responsibility is offloaded
 | 
						|
    #     """
 | 
						|
    #     try:
 | 
						|
            
 | 
						|
    #         _pointer = self._modules[_name]
 | 
						|
    #         _data = _pointer(_data)
 | 
						|
 | 
						|
    #     except Exception as e:
 | 
						|
    #         pass
 | 
						|
    #     return _data
 |