#!/usr/bin/env python """ This framework allows data to be logged to a given data store i.e : - disk, cloud (google, dropbox, box, sugarsync or s3) or a queue server The intent of the framework is to work as a standalone or embedded in code as a logging framework usage: dependencies : data-transport pip install git+https://dev.the-phi.com/git/steve/data-transport.git """ import smart import smart.top import smart.folder import smart.top import smart.logger import smart.files import uuid import typer import smart.info import json import os import transport import shutil from datetime import datetime _cli = typer.Typer() @_cli.command(name='log-intruder') def intrusion(path:str='/var/log/auth.log', year:int=datetime.now().year): """ This function """ _r = smart.logger.read(path=path,year=year) if _r : for _id in _r : if hasattr(smart.logger,_id): try: _pointer = getattr(smart.logger,_id) _df = _pointer(_r[_id]) post(_df,_id) except Exception as e: print (e) pass else: print () print ("Nothing out of the ordinary was found in") print (f"{path}") @_cli.command(name='top') def apply_apps (app:str=None,user:str=None): """ This function looks at applications/commands running on the system """ _df = smart.top.read() _id = 'apps' if not app else app if app : _index = _df.name == app if _index.sum() : _df = _df[_index] post(_df,_id) @_cli.command(name='archive') def _archive(): """ This function will archive the database, by renaming it into """ _suffix = datetime.now() _suffix = "-".join([str(_value) for _value in [_suffix.year,_suffix.month,_suffix.day,_suffix.hour,_suffix.minute]]) _path = os.sep.join([smart.info.__home__,smart.info.__database__]) _src = _path + '.db3' if os.path.exists(_src): _target = _path +'-archived-on-'+ _suffix+'.db3' shutil.move(_src,_target) _msg = f"""Archive created successfully at: {_target}""" else: _msg = """ Archive function is not available at this time, please try after logs have been stored """ print(_msg) @_cli.command(name='folder') def apply_folder(path:str): """ This function will read the content of a folder and generate a """ _df = smart.folder.read(path=path) # print (_df) post(_df,'folders') pass @_cli.command (name='files') def apply_files(folder:str) : _df = smart.files.read(folder) post(_df,'files') @_cli.command(name='register') def apply_signup (email:str,key:str=None,provider:str='sqlite') : _config = {"system":{"email":email,"uid":str(uuid.uuid4()),"version":smart.info.__version__},"store":{"provider":provider,"context":"write"}} _db = smart.info.__database__ if provider in ['sqlite','sqlite3'] : _db = os.sep.join([smart.info.__home__,_db+'.db3']) _config['store']['database'] = _db else: _config['store']['database'] = _db # # Let us store this in a folder _PATH = smart.info.__home__ _verb = "written" if not os.path.exists(_PATH) : os.mkdir(_PATH) else: _verb = "updated" f = open(os.sep.join([_PATH,'config.json']),'w') f.write(json.dumps(_config)) f.close() _msg = f""" The configuration file was {_verb} successfully at {smart.info.__home__} data store: provider {provider} database {_db} If your database has security enabled, consider updating "{smart.info.__home__}{os.sep}config.json" For appropriate security Visit https://github.com/lnyemba/data-transport for more information """ print () print (_msg) pass def post(_df,_table): """ Store data in a given location """ _path = os.sep.join([smart.info.__home__,'config.json']) f = open (_path) _config = json.loads(f.read()) f.close() _store = _config['store'] if _store['provider'] in ['mongodb','mongo','couch','couchdb'] : _store['collection'] = _table else: _store['table'] = _table writer = transport.factory.instance(**_store) writer.write(_df) if hasattr(writer,'close') : writer.close() if __name__ == '__main__' : _cli() # from transport import factory # class logger : # """ # This class is a basic logger, it will log data regardless of the types of data, We will have subclasses that will implement various data extraction schemas: # - processes (top), # """ # def __init__(self,**args): # """ # :store data store (disk,mongo,couch,google,dropbox) # :args arguments to pass for the data-store (read transport documentation) # :notify function that returns true/false for notification # """ # self.store = factory.instance(type=store,args=args['args']) # if 'notify' in args : # self.notify = args # pass # def log(self,row): # """ # This function will log data to a data store # :row row to be stored # """ # self.store.write(row=row) # if(hasattr(self,'notify')): # if (self.notify(row)) : # # # # Let us notify the backend by generating a report and submitting it # # # stream = self.get.report() # pass # else: # pass # def report(self) :