bug fix: bundled qcms and cms to run as a single unit

v2.0
Steve Nyemba 7 months ago
parent af694a3dd3
commit 1868dcfb48

@ -0,0 +1,340 @@
#!/usr/bin/env python
import numpy as np
import os
import sys
import json
import importlib
import importlib.util
from git.repo.base import Repo
import typer
from typing_extensions import Annotated
from typing import Optional
import meta
import uuid
from termcolor import colored
import base64
import io
from cms import index
start = index.start
__doc__ = """
(c) 2022 Quick Content Management System - QCMS
Health Information Privacy Lab - Vanderbilt University Medical Center
MIT License
Is a Python/Flask template for creating websites using disk structure to build the site and loading custom code
The basic template for a flask application will be created in a specified location, within this location will be found a folder "content"
- within the folder specify the folder structure that constitute the menu
Usage :
qcms --create <path> --version <value> --title <title> --subtitle <subtitle>
"""
PASSED = ' '.join(['[',colored(u'\u2713', 'green'),']'])
FAILED= ' '.join(['[',colored(u'\u2717','red'),']'])
INVALID_FOLDER = """
{FAILED} Unable to proceed, could not find project manifest. It should be qcms-manifest.json
"""
#
# handling cli interface
cli = typer.Typer()
def _get_config (path) :
if os.path.exists(path) :
f = open(path)
_conf = json.loads(f.read())
f.close()
else:
_conf = {}
return _conf
def _isvalid(_allowed,**_args):
if not list(set(_allowed) - set(_args.keys())) :
_pargs = {}
for key in _allowed :
_pargs [key] = _args[key]
return _pargs
return False
def write_config(_config, path):
f = open(path,'w')
f.write( json.dumps(_config)) ;
f.close()
def _system(**_args):
"""
Both version and context must be provided otherwise they are ignored
:version
:context context
"""
_info = _isvalid(['version','context'],**_args)
_info['theme'] = 'default.css'
_info = _info if _info else {'version':'0.0.0','context':'','theme':'default.css'}
if _info :
_info['logo'] = None if 'logo' not in _args else _args['logo']
#
# There is an aggregation entry here in app
_appInfo = {'debug':True,'port':8084,'threaded':True,'host':'0.0.0.0'}
if 'app' not in _args:
_info['app'] = _appInfo
else:
_info['app'] = dict(_appInfo,**_args['app'])
return _info
def _header(**_args):
return _isvalid(['logo','title','subtitle'],**_args)
def _layout(**_args):
_info = _isvalid(['root','index'],**_args)
_info['on'] = {"load":{},"error":{}}
_url = 'qcms.co'
_overwrite = {"folder1":{"type":"redirect","url":_url},"folder2":{"type":"dialog"},"folder3":{"type":"dialog","url":_url}}
_info['icons'] = {"comment":"use folder names as keys and fontawesome type as values to add icons to menu"}
_info["api"] = {"comment":"use keys as uri and function calls as values"}
_info['map'] = {},
_info['order'] = {'menu':[]}
_info['overwrite'] = _overwrite
return _info
def make (**_args):
"""
:context
:port port to be served
:title title of the application
:root folder of the content to be scanned
"""
if 'port' in _args :
_args['app'] = {'port':_args['port']}
del _args['port']
if 'context' not in _args :
_args['context'] = ''
_info ={'system': _system(**_args)}
_hargs = {'title':'QCMS'} if 'title' not in _args else {'title':_args['title']}
_hargs['subtitle'] = '' if 'subtitle' not in _args else _args['subtitle']
_hargs['logo'] = True
_info['layout'] = _layout(root=_args['root'],index='index.html')
_info['layout']['header'] = _header(**_hargs)
#
if 'footer' in _args :
_info['layout']['footer'] = [{'text':_args['footer']}] if type(_args['footer']) != list else [{'text':term} for term in _args['footeer']]
else:
_info['layout']['footer'] = [{'text':'Vanderbilt University Medical Center'}]
return _info
@cli.command(name="info")
def _info():
"""
This function returns metadata information about this program, no parameters are needed
"""
print ()
print (meta.__name__,meta.__version__)
print (meta.__author__,meta.__email__)
print ()
@cli.command(name='set-app')
# def set_app (host:str="0.0.0.0",context:str="",port:int=8084,debug=True):
def set_app (host:Annotated[str,typer.Argument(help="bind host IP address")]="0.0.0.0",
context:Annotated[str,typer.Argument(help="if behind a proxy server (no forward slash needed)")]="",
port:Annotated[int,typer.Argument(help="port on which to run the application")]=8084,
debug:Annotated[bool,typer.Argument(help="set debug mode on|off")]=True):
"""
This function consists in editing application access i.e port, debug, and/or context
:context alias or path for applications living behind proxy server
"""
global INVALID_FOLDER
_config = _get_config()
if _config :
_app = _config['system']['app']
_app['host'] = host
_app['port'] = port
_app['debug'] = debug
_config['system']['context'] = context
_config['app'] = _app
write_config(_config)
_msg = f"""{PASSED} Successful update, good job !
"""
else:
_msg = INVALID_FOLDER
print (_msg)
@cli.command(name='set-cloud')
# def set_cloud(path:str): #url:str,uid:str,token:str):
def set_cloud(path:Annotated[str,typer.Argument(help="path of the auth-file for the cloud")]): #url:str,uid:str,token:str):
"""
This function overwrites or sets token information for nextcloud. This function will load the content from the cloud instead of locally
:url url of the nextcloud
:uid account identifier
:token token to be used to signin
"""
if os.path.exists(path):
f = open (path)
_auth = json.loads(f.read())
if not set(['url','token','uid']) - set(_auth.keys()) :
url = _auth['url']
if os.path.exists('qcms-manifest.json') :
_config = _get_config()
_config['system']['source'] = path #{'id':'cloud','auth':{'url':url,'uid':uid,'token':token}}
write_config(_config)
title = _config['layout']['header']['title']
_msg = f"""
Successfully update, good job!
{url}
"""
else:
_msg = INVALID_FOLDER
else:
_msg = """NOT A VALID NEXTCLOUD FILE"""
else:
_msg = INVALID_FOLDER
print (_msg)
@cli.command(name='set-key')
# def set_key(path):
def set_key(
manifest:Annotated[str,typer.Argument(help="path of the manifest file")],
keyfile:Annotated[str,typer.Argument(help="path of the key file to generate")]):
"""
create a security key to control the application during developement. The location must have been created prior.
"""
if not os.path.exists(keyfile):
f = open(keyfile,'w')
f.write(str(uuid.uuid4()))
f.close()
#
_config = _get_config(manifest)
if 'source' not in _config['system']:
_config['system']['source'] = {'id':'disk'}
_config['system']['source']['key'] = keyfile
write_config(_config,manifest)
_msg = f"""{PASSED} A key was generated and written to {keyfile}
use this key in header to enable reload of the site ...`
"""
else:
_msg = f"""{FAILED} Could NOT generate a key, because it would seem you already have one
Please manually delete {keyfile}
"""
print (_msg)
def load(**_args):
"""
This function will load external module form a given location and return a pointer to a function in a given module
:path absolute path of the file (considered plugin) to be loaded
:name name of the function to be applied
"""
_path = _args['path'] #os.sep.join([_args['root'],'plugin'])
if os.path.isdir(_path):
files = os.listdir(_path)
if files :
files = [name for name in files if name.endswith('.py')]
if files:
_path = os.sep.join([_path,files[0]])
else:
return None
else:
return None
#-- We have a file ...
_name = _args['name']
spec = importlib.util.spec_from_file_location(_name, _path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return getattr(module,_name) if hasattr(module,_name) else None
@cli.command (name='create')
# def create(folder:str,root:str,index:str='index.html',title:str='qcms',subtitle:str='',footer:str='Quick Content Management System',version:str='0.1'):
def create(folder:Annotated[str,typer.Argument(help="path of the project folder")],
root:Annotated[str,typer.Argument(help="folder of the content (inside the project folder)")],
index:str=typer.Option(default="index.html",help="index file (markup or html)"), #Annotated[str,typer.Argument(help="index file to load (.html or markup)")]='index.html',
title:str=typer.Option(default="QCMS PROJECT", help="name of the project") , #Annotated[str,typer.Argument(help="title of the project")]='QCMS - TITLE',
subtitle:str=typer.Option(default="designed & built by The Phi Technology",help="tag line about the project (sub-title)"), #Annotated[str,typer.Argument(help="subtitle of the project")]='qcms - subtitle',
footer:str=typer.Option(default="Quick Content Management System",help="text to be placed as footer"), #Annotated[str,typer.Argument(help="text on the footer of the main page")]='Quick Content Management System',
version:str=typer.Option(default="0.2",help="version number") #Annotated[str,typer.Argument(help="version of the site")]='0.1'
):
"""
This function will create a project folder by performing a git clone (pull the template project)
and adding a configuration file to that will bootup the project
folder project folder
root name/location of root folder associated with the web application
@TODO:
- add port,
"""
#global SYS_ARGS
#folder = SYS_ARGS['create']
url = "https://dev.the-phi.com/git/cloud/cms.git"
#if 'url' in SYS_ARGS :
# url = SYS_ARGS['url']
print ("\tcreating project into folder " +folder)
#
# if standalone, then we should create the git repository
# if standalone :
# _repo = Repo.clone_from(url,folder)
# #
# # removing remote folder associated with the project
# _repo.delete_remote(remote='origin')
# else:
# os.makedirs(folder)
#
# @TODO: root should be handled in
rootpath = os.sep.join([folder,root])
_img = None
if not os.path.exists(rootpath) :
print (f"{PASSED} Creating content folder "+rootpath)
os.makedirs(rootpath)
os.makedirs(os.sep.join([rootpath,'_images']))
_img = """"""
_,_img = _img.split(',',1)
logo = os.sep.join([rootpath,'_images','logo.png'])
f = open(logo,'wb')
f.write(base64.b64decode(_img))
f.close()
_html = __doc__.replace("<","&lt;").replace(">","&gt;").replace("Usage","<br>Usage")
if not os.path.exists(os.sep.join([rootpath,'index.html'])) :
f = open(os.sep.join([rootpath,'index.html']),'w')
f.write(_html)
f.close()
print (f'{PASSED} configuration being written to project folder')
_args = {'title':title,'subtitle':subtitle,'version':version,'root':root,'index':index}
_config = make(**_args)
#
# updating logo reference (default for now)
_config['system']['logo'] = os.sep.join([_config['layout']['root'],'_images/logo.png'])
f = open(os.sep.join([folder,'qcms-manifest.json']),'w')
f.write(json.dumps(_config))
f.close()
return _config
@cli.command(name="bootup")
def bootup (path:Annotated[str,typer.Argument(help="path of the manifest file")]='qcms-manifest.json'):
"""
This function will launch a site/project given the location of the manifest file
"""
index.start(path)
def reset():
"""
"""
global SYS_ARGS
if __name__ == '__main__':
cli()

@ -6,192 +6,192 @@ import copy
from jinja2 import Environment, BaseLoader, FileSystemLoader
import importlib
import importlib.util
from cms import disk, cloud, engine
# from cms import disk, cloud, engine
# import cloud
class components :
# @staticmethod
# def folders (_path):
# """
# This function reads the content of a folder (no depth, it must be simple)
# """
# _content = os.listdir(_path)
# return [_name for _name in _content if os.path.isdir(os.sep.join([_path,_name])) if not _name.startswith('_')]
# @staticmethod
# def content (_folder) :
# if os.path.exists(_folder) :
# # return [{'text':_name.split('.')[0].replace('_', ' ').replace('-',' ').strip(),'uri': os.sep.join([_folder,_name])} for _name in os.listdir(_folder) if not _name.startswith('_') and os.path.isfile( os.sep.join([_folder,_name]))]
# return [{'text':_name.split('.')[0].replace('_', ' ').replace('-',' ').strip(),'uri': os.sep.join([_folder,_name])} for _name in os.listdir(_folder) if not _name.startswith('_') and os.path.isfile( os.sep.join([_folder,_name]))]
# else:
# return []
@staticmethod
def menu(_config):
"""
This function will read menu and sub-menu items from disk structure,
The files are loaded will
"""
# _items = components.folders(_path)
import index
# class components :
# # @staticmethod
# # def folders (_path):
# # """
# # This function reads the content of a folder (no depth, it must be simple)
# # """
# # _content = os.listdir(_path)
# # return [_name for _name in _content if os.path.isdir(os.sep.join([_path,_name])) if not _name.startswith('_')]
# # @staticmethod
# # def content (_folder) :
# # if os.path.exists(_folder) :
# # # return [{'text':_name.split('.')[0].replace('_', ' ').replace('-',' ').strip(),'uri': os.sep.join([_folder,_name])} for _name in os.listdir(_folder) if not _name.startswith('_') and os.path.isfile( os.sep.join([_folder,_name]))]
# # return [{'text':_name.split('.')[0].replace('_', ' ').replace('-',' ').strip(),'uri': os.sep.join([_folder,_name])} for _name in os.listdir(_folder) if not _name.startswith('_') and os.path.isfile( os.sep.join([_folder,_name]))]
# # else:
# # return []
# @staticmethod
# def menu(_config):
# """
# This function will read menu and sub-menu items from disk structure,
# The files are loaded will
# """
# # _items = components.folders(_path)
# _layout = copy.deepcopy(_config['layout'])
# _overwrite = _layout['overwrite'] if 'overwrite' in _layout else {}
#
# content of each menu item
# _subItems = [ components.content (os.sep.join([_path,_name]))for _name in _items ]
# if 'map' in _layout :
# _items = [_name if _name not in _layout['map'] else _layout['map'][_name] for _name in _items]
# # _layout = copy.deepcopy(_config['layout'])
# # _overwrite = _layout['overwrite'] if 'overwrite' in _layout else {}
# #
# # content of each menu item
# # _subItems = [ components.content (os.sep.join([_path,_name]))for _name in _items ]
# # if 'map' in _layout :
# # _items = [_name if _name not in _layout['map'] else _layout['map'][_name] for _name in _items]
# _object = dict(zip(_items,_subItems))
# # _object = dict(zip(_items,_subItems))
if 'source' in _config['system'] and _config['system']['source']['id'] == 'cloud' :
_sourceHandler = cloud
else:
_sourceHandler = disk
_object = _sourceHandler.build(_config)
# _object = disk.build(_path,_config) if type(_path) == str else cloud.build(_path,_config)
_layout = copy.deepcopy(_config['layout'])
_overwrite = _layout['overwrite'] if 'overwrite' in _layout else {}
# if 'source' in _config['system'] and _config['system']['source']['id'] == 'cloud' :
# _sourceHandler = cloud
# else:
# _sourceHandler = disk
# _object = _sourceHandler.build(_config)
# # _object = disk.build(_path,_config) if type(_path) == str else cloud.build(_path,_config)
# _layout = copy.deepcopy(_config['layout'])
# _overwrite = _layout['overwrite'] if 'overwrite' in _layout else {}
#
# @TODO: Find a way to translate rename/replace keys of the _object (menu) here
#
#-- applying overwrites to the menu items
for _name in _object :
_submenu = _object[_name]
_index = 0
for _item in _submenu :
text = _item['text'].strip()
# #
# # @TODO: Find a way to translate rename/replace keys of the _object (menu) here
# #
# #-- applying overwrites to the menu items
# for _name in _object :
# _submenu = _object[_name]
# _index = 0
# for _item in _submenu :
# text = _item['text'].strip()
if text in _overwrite :
if 'uri' in _item and 'url' in 'url' in _overwrite[text] :
del _item['uri']
_item = dict(_item,**_overwrite[text])
if 'uri' in _item:
_item['uri'] = _item['uri'].replace(_layout['root'],'')
_submenu[_index] = _item
_index += 1
return _object
# if text in _overwrite :
# if 'uri' in _item and 'url' in 'url' in _overwrite[text] :
# del _item['uri']
# _item = dict(_item,**_overwrite[text])
# if 'uri' in _item:
# _item['uri'] = _item['uri'].replace(_layout['root'],'')
# _submenu[_index] = _item
# _index += 1
# return _object
@staticmethod
def html(uri,id,_args={},_system={}) :
"""
This function reads a given uri and returns the appropriate html document, and applies environment context
# @staticmethod
# def html(uri,id,_args={},_system={}) :
# """
# This function reads a given uri and returns the appropriate html document, and applies environment context
"""
# """
if 'source' in _system and _system['source']['id'] == 'cloud':
_html = cloud.html(uri,dict(_args,**{'system':_system}))
# if 'source' in _system and _system['source']['id'] == 'cloud':
# _html = cloud.html(uri,dict(_args,**{'system':_system}))
else:
_html = disk.html(uri)
# _html = (open(uri)).read()
# else:
# _html = disk.html(uri)
# # _html = (open(uri)).read()
#return ' '.join(['<div id=":id" class=":id">'.replace(':id',id),_html,'</div>'])
_html = ' '.join(['<div id=":id" class=":id">'.replace(':id',id),_html,'</div>'])
appContext = Environment(loader=BaseLoader()).from_string(_html)
#
# If the rendering of the HTML happens here we should plugin custom functions (at the very least)
#
# #return ' '.join(['<div id=":id" class=":id">'.replace(':id',id),_html,'</div>'])
# _html = ' '.join(['<div id=":id" class=":id">'.replace(':id',id),_html,'</div>'])
# appContext = Environment(loader=BaseLoader()).from_string(_html)
# #
# # If the rendering of the HTML happens here we should plugin custom functions (at the very least)
# #
return appContext.render(**_args)
# return _html
@staticmethod
def data (_args):
"""
:store data-store parameters (data-transport, github.com/lnyemba/data-transport)
:query query to be applied against the store (expected data-frame)
"""
_store = _args['store']
reader = transport.factory.instance(**_store)
_queries= copy.deepcopy(_store['query'])
_data = reader.read(**_queries)
return _data
@staticmethod
def csv(uri) :
return pd.read(uri).to_html()
@staticmethod
def load_plugin(**_args):
"""
This function will load external module form a given location and return a pointer to a function in a given module
:path absolute path of the file (considered plugin) to be loaded
:name name of the function to be applied
"""
_path = _args['path'] #os.sep.join([_args['root'],'plugin'])
if os.path.isdir(_path):
files = os.listdir(_path)
if files :
files = [name for name in files if name.endswith('.py')]
if files:
_path = os.sep.join([_path,files[0]])
else:
return None
else:
return None
#-- We have a file ...
_name = _args['name']
# return appContext.render(**_args)
# # return _html
# @staticmethod
# def data (_args):
# """
# :store data-store parameters (data-transport, github.com/lnyemba/data-transport)
# :query query to be applied against the store (expected data-frame)
# """
# _store = _args['store']
# reader = transport.factory.instance(**_store)
# _queries= copy.deepcopy(_store['query'])
# _data = reader.read(**_queries)
# return _data
# @staticmethod
# def csv(uri) :
# return pd.read(uri).to_html()
# @staticmethod
# def load_plugin(**_args):
# """
# This function will load external module form a given location and return a pointer to a function in a given module
# :path absolute path of the file (considered plugin) to be loaded
# :name name of the function to be applied
# """
# _path = _args['path'] #os.sep.join([_args['root'],'plugin'])
# if os.path.isdir(_path):
# files = os.listdir(_path)
# if files :
# files = [name for name in files if name.endswith('.py')]
# if files:
# _path = os.sep.join([_path,files[0]])
# else:
# return None
# else:
# return None
# #-- We have a file ...
# _name = _args['name']
spec = importlib.util.spec_from_file_location(_name, _path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# spec = importlib.util.spec_from_file_location(_name, _path)
# module = importlib.util.module_from_spec(spec)
# spec.loader.exec_module(module)
return getattr(module,_name) if hasattr(module,_name) else None
@staticmethod
def plugins(_config) :
"""
This function looks for plugins in the folder on disk (no cloud support) and attempts to load them
"""
PATH= os.sep.join([_config['layout']['root'],'_plugins'])
_map = {}
# if not os.path.exists(PATH) :
# return _map
if 'plugins' not in _config :
_config['plugins'] = {}
_conf = _config['plugins']
# return getattr(module,_name) if hasattr(module,_name) else None
# @staticmethod
# def plugins(_config) :
# """
# This function looks for plugins in the folder on disk (no cloud support) and attempts to load them
# """
# PATH= os.sep.join([_config['layout']['root'],'_plugins'])
# _map = {}
# # if not os.path.exists(PATH) :
# # return _map
# if 'plugins' not in _config :
# _config['plugins'] = {}
# _conf = _config['plugins']
for _key in _conf :
# for _key in _conf :
_path = os.sep.join([PATH,_key+".py"])
if not os.path.exists(_path):
continue
for _name in _conf[_key] :
_pointer = components.load_plugin(path=_path,name=_name)
if _pointer :
_uri = "/".join(["api",_key,_name])
_map[_uri] = _pointer
#
# We are adding some source specific plugins to the user-defined plugins
# This is intended to have out-of the box plugins...
#
if 'source' in _config['system'] and _config['system']['source']['id'] == 'cloud' :
_plugins = cloud.plugins()
else:
_plugins = disk.plugins()
#
# If there are any plugins found, we should load them and use them
# _path = os.sep.join([PATH,_key+".py"])
# if not os.path.exists(_path):
# continue
# for _name in _conf[_key] :
# _pointer = components.load_plugin(path=_path,name=_name)
# if _pointer :
# _uri = "/".join(["api",_key,_name])
# _map[_uri] = _pointer
# #
# # We are adding some source specific plugins to the user-defined plugins
# # This is intended to have out-of the box plugins...
# #
# if 'source' in _config['system'] and _config['system']['source']['id'] == 'cloud' :
# _plugins = cloud.plugins()
# else:
# _plugins = disk.plugins()
# #
# # If there are any plugins found, we should load them and use them
if _plugins :
_map = dict(_map,**_plugins)
return _map
@staticmethod
def context(_config):
"""
adding custom variables functions to Jinja2, this function should be called after plugins are loaded
"""
_plugins = _config['plugins']
# if not location:
# env = Environment(loader=BaseLoader())
# else:
location = _config['layout']['root']
# env = Environment(loader=FileSystemLoader(location))
env = Environment(loader=BaseLoader())
# env.globals['routes'] = _config['plugins']
return env
@staticmethod
def get_system(_config,skip_keys=[]):
_system = copy.deepcopy(_config['system'])
if skip_keys :
for key in skip_keys :
if key in _system :
del _system
return _system
# if _plugins :
# _map = dict(_map,**_plugins)
# return _map
# @staticmethod
# def context(_config):
# """
# adding custom variables functions to Jinja2, this function should be called after plugins are loaded
# """
# _plugins = _config['plugins']
# # if not location:
# # env = Environment(loader=BaseLoader())
# # else:
# location = _config['layout']['root']
# # env = Environment(loader=FileSystemLoader(location))
# env = Environment(loader=BaseLoader())
# # env.globals['routes'] = _config['plugins']
# return env
# @staticmethod
# def get_system(_config,skip_keys=[]):
# _system = copy.deepcopy(_config['system'])
# if skip_keys :
# for key in skip_keys :
# if key in _system :
# del _system
# return _system

@ -23,6 +23,9 @@ import pandas as pd
import uuid
import datetime
from cms import disk, cloud, engine
_app = Flask(__name__)
cli = typer.Typer()
# @_app.route('/favicon.ico')

Loading…
Cancel
Save