from genericpath import isdir import os import pandas as pd import copy from jinja2 import Environment, BaseLoader, FileSystemLoader import importlib import importlib.util import json from . import sites from . import apexchart from . import meta from . import secure # def get_manifest (manifest): # if not manifest.endswith('json') and os.path.isdir(manifest): # manifest = manifest if not manifest.endswith(os.sep) else os.sep.join(manifest.split(os.sep)[:-1]) # return os.sep.join([manifest,'qcms-manifest.json']) # else: # return manifest class Plugin : # # decorator for plugin functions, this is a preliminary to enable future developement # def __init__(self,**_args): self._mimetype = _args['mimetype'] if 'method' in _args : _method = _args['method'] if type(_method) != list : _method = [_method] else: _method = ['POST','GET'] self._method = _method def __call__(self,_callback): def wrapper(**_args): return _callback(**_args) setattr(wrapper,'method',self._method) setattr(wrapper,'mimetype',self._mimetype) return wrapper @staticmethod def call(**_args): """ This function will execute a plugged-in function given """ # _uri = _args['uri'] if 'uri' in _args else '/'.join([_args['module'],_args['name']]) _handler= _args['handler'] #-- handler _request= _args['request'] _uri = _args['uri'] #_request.path[1:] # # we need to update the _uri (if context/embeded apps) # _context = _handler.system()['context'] if _context : _uri = f'{_context}/{_uri}' _plugins = _handler.plugins() _code = 200 if _uri in _plugins : _config = _handler.config() #_args['config'] _pointer = _plugins[_uri] if hasattr(_pointer,'mimetype') and _request.method in _pointer.method: # # we constraint the methods given their execution ... _mimeType = _pointer.mimetype _data = _pointer(request=_request,config=_config) else: _mimeType = 'application/octet-stream' try: _data,_mimeType = _pointer(request=_request,config=_config) except Exception as e: _data = _pointer(request=_request,config=_config) if type(_data) == pd.DataFrame : _data = _data.to_json(orient='records') elif type(_data) in [dict,list] : _data = json.dumps(_data) pass else: _code = 404 # # We should generate a 500 error in this case with a message ... # _mimeType = 'plain/html' _data = f""" """ return _data,_code,{'Content-Type':_mimeType} pass @Plugin(mimetype="application/json") def authorizationURL (**_args): # _config = _args['config'] _source = _args['config']['system']['source'] # # if 'secure' in _source : _path = _source['secure']['path'] f = open(_path) _config = json.loads(f.read()) if _config['method'] in ['oauth2','oauth20','oauth2.0'] : _url = [f"{_key}={_config[_key]}" for _key in _config if _key not in ['method','authorization_url']] # return _url _url = _config['authorization_url']+'?'+'&'.join(_url) return {"url":_url,"label":f"Login with {_source['secure']['provider']}"} else: return {} @Plugin(mimetype="text/html") def oauthFinalize (**_args): _request = _args['request'] _html = """
Please wait ...
""" return _html