#!/usr/bin/env python """ This framework allows data to be logged to a given data store i.e : - disk, cloud (google, dropbox, box, sugarsync or s3) or a queue server The intent of the framework is to work as a standalone or embedded in code as a logging framework usage: dependencies : data-transport pip install git+https://dev.the-phi.com/git/steve/data-transport.git """ # import smart import smart.top import smart.folder import smart.top import smart.logger import smart.files import uuid import typer import smart.info import json import os import transport import shutil from datetime import datetime _cli = typer.Typer() @_cli.command(name='log-intruder') def intrusion(path:str='/var/log/auth.log', year:int=datetime.now().year): """ This function """ _r = smart.logger.read(path=path,year=year) if _r : for _id in _r : if hasattr(smart.logger,_id): try: _pointer = getattr(smart.logger,_id) _df = _pointer(_r[_id]) post(_df,_id) except Exception as e: print (e) pass else: print () print ("Nothing out of the ordinary was found in") print (f"{path}") @_cli.command(name='top') def apply_apps (app:str=None,user:str=None): """ This function looks at applications/commands running on the system """ _df = smart.top.read() _id = 'apps' if not app else app if app : _index = _df.name == app if _index.sum() : _df = _df[_index] post(_df,_id) @_cli.command(name='archive') def _archive(): """ This function will archive the database, by renaming it into """ _suffix = datetime.now() _suffix = "-".join([str(_value) for _value in [_suffix.year,_suffix.month,_suffix.day,_suffix.hour,_suffix.minute]]) _path = os.sep.join([smart.info.__home__,smart.info.__database__]) _src = _path + '.db3' if os.path.exists(_src): _target = _path +'-archived-on-'+ _suffix+'.db3' shutil.move(_src,_target) _msg = f"""Archive created successfully at: {_target}""" else: _msg = """ Archive function is not available at this time, please try after logs have been stored """ print(_msg) @_cli.command(name='folder') def apply_folder(path:str): """ This function will read the content of a folder and generate a """ _df = smart.folder.read(path=path) # print (_df) post(_df,'folders') pass @_cli.command (name='files') def apply_files(folder:str) : _df = smart.files.read(folder) post(_df,'files') @_cli.command(name='register') def apply_signup (email:str,key:str=None,provider:str='sqlite') : _config = {"system":{"email":email,"uid":str(uuid.uuid4()),"version":smart.info.__version__},"store":{"provider":provider,"context":"write"}} _db = smart.info.__database__ if provider in ['sqlite','sqlite3'] : _db = os.sep.join([smart.info.__home__,_db+'.db3']) _config['store']['database'] = _db else: _config['store']['database'] = _db # # Let us store this in a folder _PATH = smart.info.__home__ _verb = "written" if not os.path.exists(_PATH) : os.mkdir(_PATH) else: _verb = "updated" f = open(os.sep.join([_PATH,'config.json']),'w') f.write(json.dumps(_config)) f.close() _msg = f""" The configuration file was {_verb} successfully at {smart.info.__home__} data store: provider {provider} database {_db} If your database has security enabled, consider updating "{smart.info.__home__}{os.sep}config.json" For appropriate security Visit https://github.com/lnyemba/data-transport for more information """ print () print (_msg) pass def post(_df,_table): """ Store data in a given location """ _path = os.sep.join([smart.info.__home__,'config.json']) f = open (_path) _config = json.loads(f.read()) f.close() _store = _config['store'] if _store['provider'] in ['mongodb','mongo','couch','couchdb'] : _store['collection'] = _table else: _store['table'] = _table writer = transport.factory.instance(**_store) writer.write(_df) if hasattr(writer,'close') : writer.close() if __name__ == '__main__' : _cli() # from transport import factory # class logger : # """ # This class is a basic logger, it will log data regardless of the types of data, We will have subclasses that will implement various data extraction schemas: # - processes (top), # """ # def __init__(self,**args): # """ # :store data store (disk,mongo,couch,google,dropbox) # :args arguments to pass for the data-store (read transport documentation) # :notify function that returns true/false for notification # """ # self.store = factory.instance(type=store,args=args['args']) # if 'notify' in args : # self.notify = args # pass # def log(self,row): # """ # This function will log data to a data store # :row row to be stored # """ # self.store.write(row=row) # if(hasattr(self,'notify')): # if (self.notify(row)) : # # # # Let us notify the backend by generating a report and submitting it # # # stream = self.get.report() # pass # else: # pass # def report(self) :