From 1868dcfb482626a72fc1ce082630f5bb06a2b54a Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 25 Jun 2024 09:52:05 -0500 Subject: [PATCH] bug fix: bundled qcms and cms to run as a single unit --- bin/qcms | 340 ++++++++++++++++++++++++++++++++++++++++++++++++ cms/__init__.py | 340 ++++++++++++++++++++++++------------------------ index.py | 3 + 3 files changed, 513 insertions(+), 170 deletions(-) create mode 100755 bin/qcms diff --git a/bin/qcms b/bin/qcms new file mode 100755 index 0000000..74042ec --- /dev/null +++ b/bin/qcms @@ -0,0 +1,340 @@ +#!/usr/bin/env python +import numpy as np +import os +import sys +import json +import importlib +import importlib.util +from git.repo.base import Repo +import typer +from typing_extensions import Annotated +from typing import Optional +import meta +import uuid +from termcolor import colored +import base64 +import io +from cms import index + +start = index.start + +__doc__ = """ +(c) 2022 Quick Content Management System - QCMS +Health Information Privacy Lab - Vanderbilt University Medical Center +MIT License + +Is a Python/Flask template for creating websites using disk structure to build the site and loading custom code +The basic template for a flask application will be created in a specified location, within this location will be found a folder "content" + - within the folder specify the folder structure that constitute the menu +Usage : + qcms --create --version --title --subtitle <subtitle> +""" + +PASSED = ' '.join(['[',colored(u'\u2713', 'green'),']']) +FAILED= ' '.join(['[',colored(u'\u2717','red'),']']) + +INVALID_FOLDER = """ +{FAILED} Unable to proceed, could not find project manifest. It should be qcms-manifest.json +""" +# +# handling cli interface +cli = typer.Typer() + +def _get_config (path) : + if os.path.exists(path) : + f = open(path) + _conf = json.loads(f.read()) + f.close() + else: + _conf = {} + return _conf +def _isvalid(_allowed,**_args): + + if not list(set(_allowed) - set(_args.keys())) : + _pargs = {} + for key in _allowed : + _pargs [key] = _args[key] + return _pargs + return False + +def write_config(_config, path): + f = open(path,'w') + f.write( json.dumps(_config)) ; + f.close() + +def _system(**_args): + """ + Both version and context must be provided otherwise they are ignored + :version + :context context + """ + _info = _isvalid(['version','context'],**_args) + _info['theme'] = 'default.css' + _info = _info if _info else {'version':'0.0.0','context':'','theme':'default.css'} + if _info : + _info['logo'] = None if 'logo' not in _args else _args['logo'] + # + # There is an aggregation entry here in app + _appInfo = {'debug':True,'port':8084,'threaded':True,'host':'0.0.0.0'} + if 'app' not in _args: + _info['app'] = _appInfo + else: + _info['app'] = dict(_appInfo,**_args['app']) + + return _info +def _header(**_args): + return _isvalid(['logo','title','subtitle'],**_args) +def _layout(**_args): + _info = _isvalid(['root','index'],**_args) + _info['on'] = {"load":{},"error":{}} + _url = 'qcms.co' + _overwrite = {"folder1":{"type":"redirect","url":_url},"folder2":{"type":"dialog"},"folder3":{"type":"dialog","url":_url}} + _info['icons'] = {"comment":"use folder names as keys and fontawesome type as values to add icons to menu"} + _info["api"] = {"comment":"use keys as uri and function calls as values"} + _info['map'] = {}, + _info['order'] = {'menu':[]} + _info['overwrite'] = _overwrite + return _info + +def make (**_args): + """ + :context + :port port to be served + + :title title of the application + :root folder of the content to be scanned + """ + if 'port' in _args : + _args['app'] = {'port':_args['port']} + del _args['port'] + if 'context' not in _args : + _args['context'] = '' + _info ={'system': _system(**_args)} + _hargs = {'title':'QCMS'} if 'title' not in _args else {'title':_args['title']} + _hargs['subtitle'] = '' if 'subtitle' not in _args else _args['subtitle'] + _hargs['logo'] = True + + + + + _info['layout'] = _layout(root=_args['root'],index='index.html') + _info['layout']['header'] = _header(**_hargs) + # + + + if 'footer' in _args : + _info['layout']['footer'] = [{'text':_args['footer']}] if type(_args['footer']) != list else [{'text':term} for term in _args['footeer']] + else: + _info['layout']['footer'] = [{'text':'Vanderbilt University Medical Center'}] + + return _info + +@cli.command(name="info") +def _info(): + """ + This function returns metadata information about this program, no parameters are needed + """ + print () + print (meta.__name__,meta.__version__) + print (meta.__author__,meta.__email__) + print () + +@cli.command(name='set-app') +# def set_app (host:str="0.0.0.0",context:str="",port:int=8084,debug=True): +def set_app (host:Annotated[str,typer.Argument(help="bind host IP address")]="0.0.0.0", + context:Annotated[str,typer.Argument(help="if behind a proxy server (no forward slash needed)")]="", + port:Annotated[int,typer.Argument(help="port on which to run the application")]=8084, + debug:Annotated[bool,typer.Argument(help="set debug mode on|off")]=True): + """ + This function consists in editing application access i.e port, debug, and/or context + + :context alias or path for applications living behind proxy server + """ + global INVALID_FOLDER + _config = _get_config() + if _config : + _app = _config['system']['app'] + _app['host'] = host + _app['port'] = port + _app['debug'] = debug + _config['system']['context'] = context + _config['app'] = _app + write_config(_config) + _msg = f"""{PASSED} Successful update, good job ! + """ + else: + _msg = INVALID_FOLDER + print (_msg) +@cli.command(name='set-cloud') +# def set_cloud(path:str): #url:str,uid:str,token:str): +def set_cloud(path:Annotated[str,typer.Argument(help="path of the auth-file for the cloud")]): #url:str,uid:str,token:str): + """ + This function overwrites or sets token information for nextcloud. This function will load the content from the cloud instead of locally + + :url url of the nextcloud + :uid account identifier + :token token to be used to signin + """ + if os.path.exists(path): + f = open (path) + _auth = json.loads(f.read()) + if not set(['url','token','uid']) - set(_auth.keys()) : + url = _auth['url'] + + if os.path.exists('qcms-manifest.json') : + _config = _get_config() + _config['system']['source'] = path #{'id':'cloud','auth':{'url':url,'uid':uid,'token':token}} + write_config(_config) + title = _config['layout']['header']['title'] + _msg = f""" + Successfully update, good job! + {url} + """ + else: + _msg = INVALID_FOLDER + else: + _msg = """NOT A VALID NEXTCLOUD FILE""" + else: + _msg = INVALID_FOLDER + print (_msg) +@cli.command(name='set-key') +# def set_key(path): +def set_key( + manifest:Annotated[str,typer.Argument(help="path of the manifest file")], + keyfile:Annotated[str,typer.Argument(help="path of the key file to generate")]): + """ + create a security key to control the application during developement. The location must have been created prior. + + """ + if not os.path.exists(keyfile): + f = open(keyfile,'w') + f.write(str(uuid.uuid4())) + f.close() + # + _config = _get_config(manifest) + if 'source' not in _config['system']: + _config['system']['source'] = {'id':'disk'} + _config['system']['source']['key'] = keyfile + write_config(_config,manifest) + _msg = f"""{PASSED} A key was generated and written to {keyfile} + use this key in header to enable reload of the site ...` + """ + else: + _msg = f"""{FAILED} Could NOT generate a key, because it would seem you already have one + Please manually delete {keyfile} + + + """ + print (_msg) + +def load(**_args): + """ + This function will load external module form a given location and return a pointer to a function in a given module + :path absolute path of the file (considered plugin) to be loaded + :name name of the function to be applied + """ + _path = _args['path'] #os.sep.join([_args['root'],'plugin']) + if os.path.isdir(_path): + files = os.listdir(_path) + if files : + files = [name for name in files if name.endswith('.py')] + if files: + _path = os.sep.join([_path,files[0]]) + else: + return None + else: + return None + #-- We have a file ... + _name = _args['name'] + spec = importlib.util.spec_from_file_location(_name, _path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + return getattr(module,_name) if hasattr(module,_name) else None + +@cli.command (name='create') +# def create(folder:str,root:str,index:str='index.html',title:str='qcms',subtitle:str='',footer:str='Quick Content Management System',version:str='0.1'): +def create(folder:Annotated[str,typer.Argument(help="path of the project folder")], + root:Annotated[str,typer.Argument(help="folder of the content (inside the project folder)")], + index:str=typer.Option(default="index.html",help="index file (markup or html)"), #Annotated[str,typer.Argument(help="index file to load (.html or markup)")]='index.html', + title:str=typer.Option(default="QCMS PROJECT", help="name of the project") , #Annotated[str,typer.Argument(help="title of the project")]='QCMS - TITLE', + subtitle:str=typer.Option(default="designed & built by The Phi Technology",help="tag line about the project (sub-title)"), #Annotated[str,typer.Argument(help="subtitle of the project")]='qcms - subtitle', + footer:str=typer.Option(default="Quick Content Management System",help="text to be placed as footer"), #Annotated[str,typer.Argument(help="text on the footer of the main page")]='Quick Content Management System', + version:str=typer.Option(default="0.2",help="version number") #Annotated[str,typer.Argument(help="version of the site")]='0.1' + ): + """ + This function will create a project folder by performing a git clone (pull the template project) + and adding a configuration file to that will bootup the project + + + folder project folder + + root name/location of root folder associated with the web application + + @TODO: + - add port, + """ + #global SYS_ARGS + #folder = SYS_ARGS['create'] + + url = "https://dev.the-phi.com/git/cloud/cms.git" + #if 'url' in SYS_ARGS : + # url = SYS_ARGS['url'] + print ("\tcreating project into folder " +folder) + # + # if standalone, then we should create the git repository + # if standalone : + # _repo = Repo.clone_from(url,folder) + # # + # # removing remote folder associated with the project + # _repo.delete_remote(remote='origin') + # else: + # os.makedirs(folder) + + + # + # @TODO: root should be handled in + rootpath = os.sep.join([folder,root]) + _img = None + if not os.path.exists(rootpath) : + print (f"{PASSED} Creating content folder "+rootpath) + os.makedirs(rootpath) + os.makedirs(os.sep.join([rootpath,'_images'])) + _img = """""" + _,_img = _img.split(',',1) + logo = os.sep.join([rootpath,'_images','logo.png']) + f = open(logo,'wb') + f.write(base64.b64decode(_img)) + f.close() + + _html = __doc__.replace("<","<").replace(">",">").replace("Usage","<br>Usage") + + if not os.path.exists(os.sep.join([rootpath,'index.html'])) : + f = open(os.sep.join([rootpath,'index.html']),'w') + f.write(_html) + f.close() + print (f'{PASSED} configuration being written to project folder') + _args = {'title':title,'subtitle':subtitle,'version':version,'root':root,'index':index} + _config = make(**_args) + # + # updating logo reference (default for now) + _config['system']['logo'] = os.sep.join([_config['layout']['root'],'_images/logo.png']) + + + f = open(os.sep.join([folder,'qcms-manifest.json']),'w') + f.write(json.dumps(_config)) + f.close() + return _config +@cli.command(name="bootup") +def bootup (path:Annotated[str,typer.Argument(help="path of the manifest file")]='qcms-manifest.json'): + """ + This function will launch a site/project given the location of the manifest file + """ + index.start(path) + +def reset(): + """ + """ + global SYS_ARGS +if __name__ == '__main__': + cli() diff --git a/cms/__init__.py b/cms/__init__.py index 98795ba..6b503fb 100644 --- a/cms/__init__.py +++ b/cms/__init__.py @@ -6,192 +6,192 @@ import copy from jinja2 import Environment, BaseLoader, FileSystemLoader import importlib import importlib.util -from cms import disk, cloud, engine +# from cms import disk, cloud, engine # import cloud - -class components : - # @staticmethod - # def folders (_path): - # """ - # This function reads the content of a folder (no depth, it must be simple) - # """ - # _content = os.listdir(_path) - # return [_name for _name in _content if os.path.isdir(os.sep.join([_path,_name])) if not _name.startswith('_')] - # @staticmethod - # def content (_folder) : - # if os.path.exists(_folder) : - # # return [{'text':_name.split('.')[0].replace('_', ' ').replace('-',' ').strip(),'uri': os.sep.join([_folder,_name])} for _name in os.listdir(_folder) if not _name.startswith('_') and os.path.isfile( os.sep.join([_folder,_name]))] - # return [{'text':_name.split('.')[0].replace('_', ' ').replace('-',' ').strip(),'uri': os.sep.join([_folder,_name])} for _name in os.listdir(_folder) if not _name.startswith('_') and os.path.isfile( os.sep.join([_folder,_name]))] - # else: - # return [] - @staticmethod - def menu(_config): - """ - This function will read menu and sub-menu items from disk structure, - The files are loaded will - """ - # _items = components.folders(_path) +import index +# class components : +# # @staticmethod +# # def folders (_path): +# # """ +# # This function reads the content of a folder (no depth, it must be simple) +# # """ +# # _content = os.listdir(_path) +# # return [_name for _name in _content if os.path.isdir(os.sep.join([_path,_name])) if not _name.startswith('_')] +# # @staticmethod +# # def content (_folder) : +# # if os.path.exists(_folder) : +# # # return [{'text':_name.split('.')[0].replace('_', ' ').replace('-',' ').strip(),'uri': os.sep.join([_folder,_name])} for _name in os.listdir(_folder) if not _name.startswith('_') and os.path.isfile( os.sep.join([_folder,_name]))] +# # return [{'text':_name.split('.')[0].replace('_', ' ').replace('-',' ').strip(),'uri': os.sep.join([_folder,_name])} for _name in os.listdir(_folder) if not _name.startswith('_') and os.path.isfile( os.sep.join([_folder,_name]))] +# # else: +# # return [] +# @staticmethod +# def menu(_config): +# """ +# This function will read menu and sub-menu items from disk structure, +# The files are loaded will +# """ +# # _items = components.folders(_path) - # _layout = copy.deepcopy(_config['layout']) - # _overwrite = _layout['overwrite'] if 'overwrite' in _layout else {} - # - # content of each menu item - # _subItems = [ components.content (os.sep.join([_path,_name]))for _name in _items ] - # if 'map' in _layout : - # _items = [_name if _name not in _layout['map'] else _layout['map'][_name] for _name in _items] +# # _layout = copy.deepcopy(_config['layout']) +# # _overwrite = _layout['overwrite'] if 'overwrite' in _layout else {} +# # +# # content of each menu item +# # _subItems = [ components.content (os.sep.join([_path,_name]))for _name in _items ] +# # if 'map' in _layout : +# # _items = [_name if _name not in _layout['map'] else _layout['map'][_name] for _name in _items] - # _object = dict(zip(_items,_subItems)) +# # _object = dict(zip(_items,_subItems)) - if 'source' in _config['system'] and _config['system']['source']['id'] == 'cloud' : - _sourceHandler = cloud - else: - _sourceHandler = disk - _object = _sourceHandler.build(_config) - # _object = disk.build(_path,_config) if type(_path) == str else cloud.build(_path,_config) - _layout = copy.deepcopy(_config['layout']) - _overwrite = _layout['overwrite'] if 'overwrite' in _layout else {} +# if 'source' in _config['system'] and _config['system']['source']['id'] == 'cloud' : +# _sourceHandler = cloud +# else: +# _sourceHandler = disk +# _object = _sourceHandler.build(_config) +# # _object = disk.build(_path,_config) if type(_path) == str else cloud.build(_path,_config) +# _layout = copy.deepcopy(_config['layout']) +# _overwrite = _layout['overwrite'] if 'overwrite' in _layout else {} - # - # @TODO: Find a way to translate rename/replace keys of the _object (menu) here - # - #-- applying overwrites to the menu items - for _name in _object : - _submenu = _object[_name] - _index = 0 - for _item in _submenu : - text = _item['text'].strip() +# # +# # @TODO: Find a way to translate rename/replace keys of the _object (menu) here +# # +# #-- applying overwrites to the menu items +# for _name in _object : +# _submenu = _object[_name] +# _index = 0 +# for _item in _submenu : +# text = _item['text'].strip() - if text in _overwrite : - if 'uri' in _item and 'url' in 'url' in _overwrite[text] : - del _item['uri'] - _item = dict(_item,**_overwrite[text]) - if 'uri' in _item: - _item['uri'] = _item['uri'].replace(_layout['root'],'') - _submenu[_index] = _item - _index += 1 - return _object +# if text in _overwrite : +# if 'uri' in _item and 'url' in 'url' in _overwrite[text] : +# del _item['uri'] +# _item = dict(_item,**_overwrite[text]) +# if 'uri' in _item: +# _item['uri'] = _item['uri'].replace(_layout['root'],'') +# _submenu[_index] = _item +# _index += 1 +# return _object - @staticmethod - def html(uri,id,_args={},_system={}) : - """ - This function reads a given uri and returns the appropriate html document, and applies environment context +# @staticmethod +# def html(uri,id,_args={},_system={}) : +# """ +# This function reads a given uri and returns the appropriate html document, and applies environment context - """ +# """ - if 'source' in _system and _system['source']['id'] == 'cloud': - _html = cloud.html(uri,dict(_args,**{'system':_system})) +# if 'source' in _system and _system['source']['id'] == 'cloud': +# _html = cloud.html(uri,dict(_args,**{'system':_system})) - else: - _html = disk.html(uri) - # _html = (open(uri)).read() +# else: +# _html = disk.html(uri) +# # _html = (open(uri)).read() - #return ' '.join(['<div id=":id" class=":id">'.replace(':id',id),_html,'</div>']) - _html = ' '.join(['<div id=":id" class=":id">'.replace(':id',id),_html,'</div>']) - appContext = Environment(loader=BaseLoader()).from_string(_html) - # - # If the rendering of the HTML happens here we should plugin custom functions (at the very least) - # +# #return ' '.join(['<div id=":id" class=":id">'.replace(':id',id),_html,'</div>']) +# _html = ' '.join(['<div id=":id" class=":id">'.replace(':id',id),_html,'</div>']) +# appContext = Environment(loader=BaseLoader()).from_string(_html) +# # +# # If the rendering of the HTML happens here we should plugin custom functions (at the very least) +# # - return appContext.render(**_args) - # return _html - @staticmethod - def data (_args): - """ - :store data-store parameters (data-transport, github.com/lnyemba/data-transport) - :query query to be applied against the store (expected data-frame) - """ - _store = _args['store'] - reader = transport.factory.instance(**_store) - _queries= copy.deepcopy(_store['query']) - _data = reader.read(**_queries) - return _data - @staticmethod - def csv(uri) : - return pd.read(uri).to_html() - @staticmethod - def load_plugin(**_args): - """ - This function will load external module form a given location and return a pointer to a function in a given module - :path absolute path of the file (considered plugin) to be loaded - :name name of the function to be applied - """ - _path = _args['path'] #os.sep.join([_args['root'],'plugin']) - if os.path.isdir(_path): - files = os.listdir(_path) - if files : - files = [name for name in files if name.endswith('.py')] - if files: - _path = os.sep.join([_path,files[0]]) - else: - return None - else: - return None - #-- We have a file ... - _name = _args['name'] +# return appContext.render(**_args) +# # return _html +# @staticmethod +# def data (_args): +# """ +# :store data-store parameters (data-transport, github.com/lnyemba/data-transport) +# :query query to be applied against the store (expected data-frame) +# """ +# _store = _args['store'] +# reader = transport.factory.instance(**_store) +# _queries= copy.deepcopy(_store['query']) +# _data = reader.read(**_queries) +# return _data +# @staticmethod +# def csv(uri) : +# return pd.read(uri).to_html() +# @staticmethod +# def load_plugin(**_args): +# """ +# This function will load external module form a given location and return a pointer to a function in a given module +# :path absolute path of the file (considered plugin) to be loaded +# :name name of the function to be applied +# """ +# _path = _args['path'] #os.sep.join([_args['root'],'plugin']) +# if os.path.isdir(_path): +# files = os.listdir(_path) +# if files : +# files = [name for name in files if name.endswith('.py')] +# if files: +# _path = os.sep.join([_path,files[0]]) +# else: +# return None +# else: +# return None +# #-- We have a file ... +# _name = _args['name'] - spec = importlib.util.spec_from_file_location(_name, _path) - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) +# spec = importlib.util.spec_from_file_location(_name, _path) +# module = importlib.util.module_from_spec(spec) +# spec.loader.exec_module(module) - return getattr(module,_name) if hasattr(module,_name) else None - @staticmethod - def plugins(_config) : - """ - This function looks for plugins in the folder on disk (no cloud support) and attempts to load them - """ - PATH= os.sep.join([_config['layout']['root'],'_plugins']) - _map = {} - # if not os.path.exists(PATH) : - # return _map - if 'plugins' not in _config : - _config['plugins'] = {} - _conf = _config['plugins'] +# return getattr(module,_name) if hasattr(module,_name) else None +# @staticmethod +# def plugins(_config) : +# """ +# This function looks for plugins in the folder on disk (no cloud support) and attempts to load them +# """ +# PATH= os.sep.join([_config['layout']['root'],'_plugins']) +# _map = {} +# # if not os.path.exists(PATH) : +# # return _map +# if 'plugins' not in _config : +# _config['plugins'] = {} +# _conf = _config['plugins'] - for _key in _conf : +# for _key in _conf : - _path = os.sep.join([PATH,_key+".py"]) - if not os.path.exists(_path): - continue - for _name in _conf[_key] : - _pointer = components.load_plugin(path=_path,name=_name) - if _pointer : - _uri = "/".join(["api",_key,_name]) - _map[_uri] = _pointer - # - # We are adding some source specific plugins to the user-defined plugins - # This is intended to have out-of the box plugins... - # - if 'source' in _config['system'] and _config['system']['source']['id'] == 'cloud' : - _plugins = cloud.plugins() - else: - _plugins = disk.plugins() - # - # If there are any plugins found, we should load them and use them +# _path = os.sep.join([PATH,_key+".py"]) +# if not os.path.exists(_path): +# continue +# for _name in _conf[_key] : +# _pointer = components.load_plugin(path=_path,name=_name) +# if _pointer : +# _uri = "/".join(["api",_key,_name]) +# _map[_uri] = _pointer +# # +# # We are adding some source specific plugins to the user-defined plugins +# # This is intended to have out-of the box plugins... +# # +# if 'source' in _config['system'] and _config['system']['source']['id'] == 'cloud' : +# _plugins = cloud.plugins() +# else: +# _plugins = disk.plugins() +# # +# # If there are any plugins found, we should load them and use them - if _plugins : - _map = dict(_map,**_plugins) - return _map - @staticmethod - def context(_config): - """ - adding custom variables functions to Jinja2, this function should be called after plugins are loaded - """ - _plugins = _config['plugins'] - # if not location: - # env = Environment(loader=BaseLoader()) - # else: - location = _config['layout']['root'] - # env = Environment(loader=FileSystemLoader(location)) - env = Environment(loader=BaseLoader()) - # env.globals['routes'] = _config['plugins'] - return env - @staticmethod - def get_system(_config,skip_keys=[]): - _system = copy.deepcopy(_config['system']) - if skip_keys : - for key in skip_keys : - if key in _system : - del _system - return _system +# if _plugins : +# _map = dict(_map,**_plugins) +# return _map +# @staticmethod +# def context(_config): +# """ +# adding custom variables functions to Jinja2, this function should be called after plugins are loaded +# """ +# _plugins = _config['plugins'] +# # if not location: +# # env = Environment(loader=BaseLoader()) +# # else: +# location = _config['layout']['root'] +# # env = Environment(loader=FileSystemLoader(location)) +# env = Environment(loader=BaseLoader()) +# # env.globals['routes'] = _config['plugins'] +# return env +# @staticmethod +# def get_system(_config,skip_keys=[]): +# _system = copy.deepcopy(_config['system']) +# if skip_keys : +# for key in skip_keys : +# if key in _system : +# del _system +# return _system diff --git a/index.py b/index.py index 131371c..2818014 100644 --- a/index.py +++ b/index.py @@ -23,6 +23,9 @@ import pandas as pd import uuid import datetime + +from cms import disk, cloud, engine + _app = Flask(__name__) cli = typer.Typer() # @_app.route('/favicon.ico')