You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cms/cms/sites/__init__.py

643 lines
25 KiB
Python

"""
This file will describe how a site is loaded
"""
from cms.engine.config.structure import Layout, System
from cms import disk, cloud
import cms
import copy
import json
import io
import os
from jinja2 import Environment, BaseLoader, FileSystemLoader
from datetime import datetime
import mistune
from mistune import markdown
from flask import make_response, render_template
import numpy as np
import pandas as pd
class RequestController :
def __init__(self,_route):
self._routes = _route
def isfile(self,request):
#
# call self.exists to determin if this is an actual file or not
return '.' in request.path
def isroute(self,request):
# return '/'.join(request.path[1:].split('/')[:2]) in self._routes
return self.get_route(request) in self._routes
return False
def isapi(self,request):
return 'api' in request.path
def get_route(self,request):
_items = request.path[1:].split('/')
_items = _items if _items[-1] != '' else _items[:-1]
N = len(_items) + 1
_names = []
for i in range(1, len(_items) + 1):
_names.append('/'.join(_items[:i]))
_names=[_item for _item in _names if _item in self._routes]
return _names[-1] if _names else None
class IOConfig:
"""
This class encapsulates read/write for configuration
"""
def __init__(self,**_args):
self._config = {'system':{},'layout':{},'plugins':{}}
self._caller = None
self._location= _args['location'] if 'location' in _args else None
self._logs = []
def get(self,_key) :
#
# @TODO: Include elements to be skipped (just in case & for security reasons)
if not _key :
return self._config
_keys = _key.split('.') if '.' in _key else [_key]
_value= None
_object = copy.deepcopy(self._config)
for _key in _keys :
if _key in _object :
_object = _object[_key]
else:
return None
return _object
def set(self,_expr,value):
"""
Set the value of an attribute given the path with dot of the attribute
_expr expression of the attribute e.g: person.age
value value to be set for the attribute
"""
if len(_expr.split('.')) == 1 :
self._config[_expr] = value
else:
_object = self._config
for _attr in _expr.split('.')[:-1] :
if _attr not in _object :
_object[_attr] = {}
_object = _object[_attr]
_attr = _expr.split('.')[-1]
_object[_attr] = value
def log(self,**_args):
_row = dict({'_date':datetime.now().strftime('%Y-%m-%d'),'time':datetime.now().strftime('%H:%M:%S'), 'context':self.get('system.context')},**_args)
self._logs.append(_row)
def debug(self,**_args):
return self._logs,'application/json'
class Initialization (IOConfig):
def __init__(self,**_args):
super().__init__(**_args)
# self._config = self.read_config(**_args)
self._args = _args
#
# Invoke initialization
self.reload()
#
if self._caller and self._caller.secure :
self.secure = self._caller.secure
else:
self.secure = cms.secure.Manager(config = self._config)
#
# Log initializaton ...
#
self.log(action='init.security',module='site.init',input= self.secure._permissions.to_dict(orient='records'))
def reload (self):
_args = self._args
self._config = self.read_config(**_args)
self._caller = _args['caller'] if 'caller' in _args else None
if (self._caller):
self._config['system']['parentContext'] = ''
if not self.get('system.icon') :
self._config['system']['icon'] = None
self._config['layout']['location'] = None
if not self.get('layout.menu') :
self._config['layout']['menu'] = {}
self.context(**_args)
self.menu()
self.plugins()
def read_config(self,**_args) :
_config = {}
if 'path' in _args :
f = open(_args['path'])
_config = json.loads(f.read())
f.close()
elif 'stream' in _args :
_stream = _args['stream']
if type(_stream) == 'str' :
_config = json.loads(_stream)
elif type(_stream) == io.StringIO :
_config = json.loads( _stream.read())
_name = self._caller.__name__ if self._caller else None
self.log(action='init.config',module='read_config',input={'caller':_name})
return _config
def context(self,**_args):
"""
Updating the context so that we can have a consistent representation
"""
if ('context' in _args):
self.set('system.context',_args['context'])
pass
_context = self.get('system.context')
_context = _context[:-1] if _context.endswith('/') else _context
# _context = f'{_context}/' if _context != "" and not _context.endswith("/") else _context
# _context = f'{_context}/files'
if self._caller :
#
# There is a parent context we need to account and we are updating the current context to reflect the caller context
_parentContext = self._caller.get('system.context')
_context = '/'.join([self._caller.get('system.context'),_context])
#
# updating the configuratioin
_iconURI = '/'.join(["",_parentContext,self._caller.get('system.icon')])
if self._caller.get('system.context') != '':
_parentContext = "/"+_parentContext
_context = "/"+_context
self.set('system.onport',0)
self._config['system']['onport'] = 0
else:
_iconURI = _iconURI.replace("///","/")
self.set('system.onport',1)
self._config['system']['onport'] = 1
self.set('system.caller',{'icon':_iconURI,'context':self._caller.get('system.context')})
# _context = _context.replace('/','')
else:
#
# If we have no parent site, there should be no parent context (or empty string)
_parentContext = _context #@TODO: should be none ?
#
# Updating context so it is usable throughout the code base
if self._caller:
self.set('system.parentContext', _parentContext.strip())
self.set('system.context',_context.strip())
# self._config['system']['context'] = _context.strip()
self._config['system']['parentContext'] = _parentContext
p = {'has_caller':self._caller != None,'context':_context}
self.log(action='init.context',module='context',input=p)
#
# loosly context to a certain extent involves locations of icons and document root
#
self.locations(**_args)
#
# submit log of status
def locations(self,**_args):
"""
We are updating the icons, project location file and security key (if any), we need to make sure that context is updated first
"""
#
# updating location & path of the application
# if self.get('system.caller') :
# return
_context = self.get('system.context')
_logo = self.get('system.logo')
_root = self.get('layout.root')
# if self.get('system.source.id') == 'cloud' :
# _icon = f'{_context}/api/cloud/download?doc={_logo}'
# else:
# _icon = f'{_context}/api/disk/read?uri={_logo}'
_icon = f'{_context}/{_logo}'.replace(_root,'')
self.set('system.icon',_icon)
self.set('system.logo',_icon)
#
# from the path provided we can determine the location of the project
_homefolder = os.sep.join(_args['path'].split(os.sep)[:-1])
if _homefolder and os.path.exists(os.sep.join([_homefolder,_root])) :
self.set('layout.location',_homefolder)
#
# updating the security key and trying to determine the location
if self.get('system.source.key') :
_path = self.get('system.source.key')
f = None
if os.path.exists(_path) :
f = open(_path)
elif os.path.exists(os.sep.join([self.get('layout.location'),_path])) :
f = open(os.sep.join([self.get('layout.location'),_path]))
if f :
self.set('system.source.key',f.read())
f.close()
def menu (self,**_args):
"""
This function will build the menu of the site (if need be)
"""
_handler = cloud if self.get('system.source.id') == 'cloud' else disk
_object = _handler.build(self._config)
_overwrite = self.get('layout.overwrite')
for _name in _object :
_submenu = _object[_name]
_index = 0
for _item in _submenu :
_text = _item['text'].strip()
if _text in _overwrite :
if 'uri' in _item and 'url' in _overwrite :
del _item['uri']
_item = dict(_item,**_overwrite[_text])
_submenu[_index] = _item
_index += 1
self.set('layout.menu',_object)
# self._config['layout']['menu'] = _object
self.routes() #-- other apps/sites integrated in case we are operating in portal mode
#
# At this point the entire menu is build and we need to have it sorted
self.order()
def order(self,**_args):
if self.get('layout.order.menu') :
_sorted = {}
_menu = self.get('layout.menu')
_ordered = self.get('layout.order.menu')
for _name in _ordered :
if _name in _menu :
_sorted[_name] = _menu[_name]
if _sorted :
_menu = _sorted
_missing = list(set(_menu.keys()) - set(_sorted))
if _missing :
for _name in _missing :
_menu[_name] = _menu[_name]
self.set('layout.menu',_menu)
def routes (self,**_args):
"""
This method will update menu with dependent sites/apps and add them to the menu
"""
_overwrite = self.get('layout.overwrite')
_routes = self.get('system.routes')
_context = self.get('system.context')
_menu = self.get('layout.menu')
if _routes :
for _text in _routes :
_item = _routes[_text]
if 'menu' in _item :
_uri = f'{_context}/{_text}'
_label = _item['menu']
if _text in _overwrite :
_text = _overwrite[_text]['text']
if _label not in _menu :
_menu[_label] = []
_menu[_label].append({"text":_text,"uri":_uri,"type":"open"})
#
# updating menu ...
self.set('layout.menu',_menu)
# self._config['layout']['menu'] = _menu
else:
pass
def plugins (self,**_args):
"""
This function will load the plugins from disk (only)
"""
_folder = os.sep.join([self.get('layout.location'),self.get('layout.root'),'_plugins'])
_context = self.get('system.context')
_parentContext = self.get('system.parentContext')
_map = {}
_plugins = {}
# if self.get('system.source.id') == 'cloud' :
# _plugins = cloud.plugins(_context)
# else:
# _plugins = disk.plugins(context=_context)
_uri = f'{_context}/api/system/debug'
if _uri.startswith('/') :
_uri = _uri[1:]
# _uri = _uri if not _context else f'{_context}/{_uri}'
_plugins[_uri] = self.debug
if os.path.exists(_folder) and self.get('plugins'):
_items = self.get('plugins')
for _filename in _items:
_path = os.sep.join([_folder,_filename+'.py'])
if os.path.exists(_path) :
for _module in _items[_filename] :
_pointer = disk.plugins(path=_path,name=_module,context=_context)
if _pointer :
_uri = f"api/{_filename}/{_module}"
_uri = f"{_context}/{_uri}" if _context else _uri
if (_uri.startswith("/")) :
_uri = _uri[1:]
_map[_uri] = _pointer
if _parentContext :
# _uri = f"{_parentContext}/{_context}"
_map[_uri] = _pointer
self.log(action='load.plugin',module='plugins',input={'uri':_uri,'path':_path,'name':_module})
else:
self.log(action='missing.function',module='plugins',input={'name':_module,'path':_path})
else:
#
# log the error ...
self.log(action='missing.plugins',module='plugins',input=_path)
#
# Updating plugins from disk/cloud
_plugins = _map if not _plugins else dict(_plugins,**_map)
#
# if we have login enabled we should add them as a plugins
#
self.set('plugins',_plugins)
self.log(action='init.plugins',module='plugins',input=list(_plugins.keys()))
class Site(Initialization) :
def __init__(self,**_args):
super().__init__(**_args)
self._config['system']['portal'] = (self.get('system.routes')) == None
self._routes = []
if self.get('system.routes') :
self._routes = list(self.get('system.routes').keys())
if self._caller :
self._routes = self._caller._routes
self.inspect = RequestController(self._routes)
#
# let's update the plugins
#
if self.secure.method() in ['oauth2.0','oauth20','oauth2'] :
_plugins = self.get('plugins')
_plugins['api/oauth2/authorize'] = cms.authorizationURL
_plugins['api/oauth2/final'] = cms.oauthFinalize
self.set('plugins',_plugins)
def exists (self,uri):
path = []
if self.get('layout.location'):
path.append(self.get('layout.location'))
if self.get('layout.root') not in uri :
path.append(self.get('layout.root'))
path.append(uri)
return os.path.exists( os.sep.join(path))
# def _html(self,_uri,_id,request) :
# if self.secure.allow(request=request):
# _handler = cloud if self.get('system.source.id') == 'cloud' else disk
# _html = _handler.html(_uri, self.get(None))
# #
# # maybe apply the environment here ??
# return " ".join([f'<div id="{_id}"> ',_html,"</div>"])
# else:
# None
def mimeType(self,uri):
_extension = uri.split('.')[-1].strip().lower()
_mimeType = 'application/octet-stream'
if _extension in ['css','js','csv','html','md'] :
_mimeType = f'text/{_extension}'.replace('md','html')
if _extension == 'js' :
_mimeType = 'text/javascript'
elif _extension in ['png','jpg','jpeg'] :
_mimeType = f'image/{_extension}'
return _mimeType
def path(self,_uri):
path = []
if self.get('layout.location'):
path.append(self.get('layout.location'))
if self.get('layout.root') not in _uri :
path.append(self.get('layout.root'))
path.append(_uri)
return os.sep.join(path)
def html (self,_request):
_uri = self.uri(_request)
_mimeType = self.mimeType(_uri)
f = open(self.path(_uri),'r')
_content = f.read() #_handler.html(_uri, self.get(None))
f.close()
if 'md' in _mimeType or 'html' in _mimeType :
# _content = f'<div>{_content}</div>'
if _uri.split('.')[-1].strip().lower() == 'md':
_content = mistune.html(_content).replace("&quot;",'"').replace("&lt;","<").replace("&gt;",">") if _uri[-2:] in ['md','MD','Md','mD'] else _content
_content = f"<div>{_content}</div>" #if 'dom' not in _request.headers else f'<div id="{_request.headers['dom']}">{_content}</div>'
else:
_env = Environment(loader=BaseLoader()).from_string(_content)
_content = str(_env.render(**self.get(None)))
pass
return _content,_mimeType
def uri(self,request):
if 'uri' in request.headers or 'uri' in request.args :
return request.headers['uri'] if 'uri' in request.headers else request.args.get('uri',None)
else:
_route = self.inspect.get_route(request)
#
# The route doesn't have any forward slashes nor slash at the end
#
file = request.path.replace(f'{_route}','')
#
if file.startswith('//') or file.startswith('/') :
# NOTE: false positive if file.startswith('/') is used
file = file[2:] if file.startswith('//') else file[1:]
return file if file.strip() not in ['',None,'/'] else self.get('layout.index')
# # file = '/'.join(request.path[1:].split('/')[1:])
# file = request.path[1:]
# _isapi = 'api' in request.path
# _isroute = '/'.join(request.path[1:].split('/')[:2]) in self._routes
# _isfile = '.' in request.path
# return file if _isfile and not _isroute else self.get('layout.index')
def read(self,request) :
_kwargs = {'allow':0}
# if self.secure.allow(request=request):
_uri = self.uri(request)
_handler = cloud if self.get('system.source.id') == 'cloud' else disk
_extension = _uri.split('.')[1].lower()
_mimeType = self.mimeType(_uri)
if _extension.strip() in ['md','txt','html','js','css'] :
_content,_mimeType = self.html(request)
else:
#
# Opening a binary file
f = open(self.path(_uri),'rb')
_content = io.BytesIO(f.read())
f.close()
# _content,_ = _handler.read(uri=_uri, config=self.get(None))
_kwargs = {'allow':1,'mimeType':_mimeType,'extension':_extension,
'path':self.path(_uri),
'uri':_uri,'request':request.path}
self.log(action='file.read',module='site.read',input=_kwargs)
if _content :
return _content, _mimeType
return None, 'plain/html'
# if 'html' in _mimetype :
# _args = {'layout':self.get('layout'),'system':self.get('system')}
# _content = _content.decode('utf-8') if type(_content) == bytes else _content
# _env = Environment(loader=BaseLoader()).from_string(_content)
# _content = _env.render(**_args)
# return _content,_mimetype
# return None,'plain/html'
def render (self,**_args):
"""
:id target of the dom or jinja
:request incoming request
:uri uri to read from disk ...
"""
# _site = self._sites[_appid] if _appid else self._sites[self._id]
_id = _args.get('id')
# _uri= _args['uri']
_request = _args['request']
_uri = self.uri(_request)
_kwargs = {'layout':self.get('layout')}
_system = self.get('system')
for k in ['source','app'] :
if k in _system :
del _system[k]
_kwargs['system'] = _system
_cookies = json.loads(_request.cookies.get(self.secure._authContext ,"{}"))
if _cookies :
_kwargs['username'] = _cookies['username']
_html,_mimeType = self.html(_request)
# _kwargs[_id] = f'<div id="{_id}">{_html}</div>'
# return _kwargs
# _html = f'<div id="{_id}">{_html}</div>'
if _html :
_env = Environment(loader=BaseLoader()).from_string(f'<div id="{_id}">{_html}</div>')
_kwargs[_id] = str(_env.render(**_kwargs))
return _kwargs
return None
def apply_tags (self,_html):
_kwargs = {'layout':self.get('layout'),'system':self.get('system')}
_env = Environment(loader=BaseLoader()).from_string(f'<div>{_html}</div>')
return _env.render(**_kwargs)
def run(self,_request) :
_plugins = self.get('plugins')
_data = "<div align='center'><h2>404</h2></div>"
_mimeType = 'plain/html'
_code = 404
_key = _request.path[1:] #if self.get('system.context') != '' else _request.path[1:]
# print ([_key,_key in list(_plugins.keys())])
# print (list (_plugins.keys()))
if _plugins and _key in _plugins:
_mimeType = 'application/octet-stream'
_pointer = _plugins.get(_key)
if hasattr(_pointer,'mimetype'):
_mimeType = _pointer.mimetype
_data = _pointer(request=_request,config=self.get(None))
if hasattr(_pointer,'method') and _request.method not in _pointer.method :
_data = "<div align='center'><h2>404</h2></div>"
elif not hasattr(_pointer,'mimetype'):
_data,_mimeType = _pointer(request=_request,config=self.get(None))
if 'html' in _mimeType :
_data = self.apply_tags(_data)
if type(_data) == pd.DataFrame :
_data = _data.to_json(orient='records')
elif type(_data) in [dict,list] :
_data = json.dumps(_data)
# #
# # return the ata
_code = _code if not _data else 200
# resp = make_response(_data)
# resp.status_code = _code
# resp.headers['Content-Type'] = _mimeType
# resp.headers['Content-Disposition'] = 'inline'
return _data,_code,{"Content-Type":_mimeType}
class QCMS:
def __init__(self,**_args):
_app = Site(**_args)
self._id = _app.get('system.context') #if _app.get('system.context') else 'main'
# if self._id == '' :
# self._id = '/'
self._sites = {self._id:_app, '/':_app}
self._routes = []
if _app.get('system.routes') :
_routes = _app.get('system.routes')
for _name in _routes :
self._routes.append(_name)
_path = _routes[_name]['path']
self._sites[_name] = Site(context=_name,path=_path,caller=_app)
# self._sites[f'{_name}/'] = self._sites[_name]
self.inspect = RequestController(self._routes)
def _render(self,request):
_site = self.get(request)
_args = _site.render(request=request,id='index')
return render_template('index.html',**_args)
def _read(self,request):
_site = self.get(request)
return _site.read(request)
def delegate(self,request):
isfile = self.inspect.isfile(request)
isapi = self.inspect.isapi(request)
isroute= self.inspect.isroute(request)
if not isapi and not isfile:
return self._render(request)
#
# # let's check on files to be services (assumption is that they should have an extension)
# The following propositional logic is as such
# (isfile and isroute) or (isfile and not isroute) -> isfile (isroute is optional)
if isfile :
_content,_mimeType = self._read(request)
return _content,200,{'Content-Type':_mimeType}
if isapi :
_site = self.get(request)
return _site.run(request)
def get(self,request=None) :
if request :
if self.inspect.isroute(request) :
self._id = self.inspect.get_route(request)
else:
self._id = ''
else:
self._id = ''
return self._sites[self._id]
def set(self,_id):
self._id = _id
def allow(self,id, request) :
_site = self.get(id)
return _site.secure.allow(request=request)
def has(self,_id):
return _id in self._sites