From 39b32768e42ab29c38c206dffeebe74027bf9a3d Mon Sep 17 00:00:00 2001 From: "Steve L. Nyemba" Date: Mon, 14 Aug 2017 02:22:13 -0500 Subject: [PATCH] DC - insured the data is logged where the plan specified --- src/data-collector.py | 1 + src/utils/agents/data-collector.py | 79 ++++ src/utils/agents/manager.pyc | Bin 3622 -> 3745 bytes src/utils/transport.py | 676 ----------------------------- src/utils/transport.pyc | Bin 21410 -> 21410 bytes 5 files changed, 80 insertions(+), 676 deletions(-) delete mode 100644 src/utils/transport.py diff --git a/src/data-collector.py b/src/data-collector.py index 0c3cbd2..8866dde 100644 --- a/src/data-collector.py +++ b/src/data-collector.py @@ -44,6 +44,7 @@ class Collector(Thread) : This funtion runs the authorized features and """ #self.monitor.start() + thread = Thread(target=self.monitor.run) thread.start() # print self.monitor.config['store'] diff --git a/src/utils/agents/data-collector.py b/src/utils/agents/data-collector.py index e727346..6e451df 100644 --- a/src/utils/agents/data-collector.py +++ b/src/utils/agents/data-collector.py @@ -15,6 +15,85 @@ import time from datetime import datetime from utils.transport import * import monitor +class Manager(Thread) : + """ + delay : + limit : + scope : apps,folders,learner,sandbox + """ + def __init__(self): + Thread.__init__(self) + self.lock = RLock() + self.factory = DataSourceFactory() + def init(self,args) : + node,pool,config + self.id = args['node'] + self.pool = args['pool'] + self.config = args['config'] + self.key = args['key'] + + self.status() #-- Initializing status information + def status(self) : + """ + This method inspect the plans for the current account and makes sure it can/should proceed + The user must be subscribed and to the service otherwise this is not going to work + """ + url="https://the-phi.com/store/status/monitor" + r = requests.post(url,headers={"uid":self.key}) + plans = json.loads(r.text) + + meta = [item['metadata'] for item in plans if item['status']=='active' ] + if len(meta) > 0 : + self.DELAY = 60* max([ int(item['delay']) for item in meta if ]) + self.LIMIT = max([ int(item['limit']) for item in meta if ]) + else: + self.DELAY = -1 + self.LIMIT = -1 + scope = [] + [ scope += item['scope'].split(',') for item in meta ] + names = [ for agent in self.pool if agent.getName() in scope] + return meta + + def isvalid(self): + self.status() + return self.DELAY > -1 and self.LIMIT > -1 + def run(self): + #DELAY=35*60 #- 35 Minutes + #LIMIT=1000 + COUNT = 0 + COUNT_STOP = int(24*60/ self.DELAY) + print COUNT_STOP + write_class = self.config['store']['class']['write'] + read_args = self.config['store']['args'] + + while True : + COUNT += 1 + if COUNT > COUNT_STOP : + if self.isvalid() : + COUNT = 0 + else: + break + for agent in self.pool : + + data = agent.composite() + label = agent.getName() + node = '@'.join([label,self.id]) + row = {} + if label == 'folders': + row = [ dict({"id":self.id}, **_row) for _row in data] + + else: + label = id + row = data + + self.lock.acquire() + store = self.factory.instance(type=write_class,args=read_args) + store.flush(size=self.LIMIT) + store.write(label=label,row=row) + self.lock.release() + time.sleep(self.DELAY) + + class ICollector(Thread) : def __init__(self) : diff --git a/src/utils/agents/manager.pyc b/src/utils/agents/manager.pyc index e71f7dfa3ba5f6904d4d650d7f4b2fb0b01183d8..c7f9129b75895349e3785b4aee8164e124803deb 100644 GIT binary patch delta 513 zcmXw#PiqrV6vfY-Ox~MJW+oIV2BQ%P(N*LkL9vc~nzc@TRhQ{}9?Dbmrpj57!ic!nTm8)e zC-xjQc+L*0c|$0FveQ)K@AkTvclftmZFhyDYG{U9UXNNdPgm(CwJUr-ST~o%%QOBN ztkL30P-}Y&7nMw~jw^hTXQTZ=zMwo&q`&^n_U6v^-n(LuU*RY9hbB|^8h@?3bY5$; z&Gn0ZzNkj5vc%nJjh4BLdPMvqx|>W!J`hj4!A<`*--=U8c{R?=toE$%$M`bM@NvBV E4>WmQPyhe` delta 439 zcmXw#Jx>Bb5Qb;ZV~>x!Gh!lM!dHVrfB+^4HEKh&IZZ574Dl`mRF02BEsj{=FK|jL z8+$`xVry%uzeE!ojn0y|$-a5>&NH)*NPn@2Gi%iwYFWUy2)Iw7z$m@tSC)Bl8hqN%^5 z5*mlG0-gdL0x0Eh%3+XTlr$vhlc9H1F?Bbs(>lYaX_l|ENc)$L>l4X|)3aShE|=Ev zR5QqCSJ>$+#MUjgI-2lmnbKY}?ZPzw4_!`>gVxYN( zan&*kf)Q<2(`}j+QP6}`@lHKpRs2%(Ov045#b+bySkmkGtQl+x-?TJO3buxlo?&s! G>7zg3;aCL# diff --git a/src/utils/transport.py b/src/utils/transport.py deleted file mode 100644 index 13d08ae..0000000 --- a/src/utils/transport.py +++ /dev/null @@ -1,676 +0,0 @@ -""" - This file implements data transport stuctures in order to allow data to be moved to and from anywhere - We can thus read data from disk and write to the cloud,queue, or couchdb or SQL -""" -from flask import request, session -import os -import pika -import json -import numpy as np -from couchdbkit import Server -import re -from csv import reader -from datetime import datetime -""" - @TODO: Write a process by which the class automatically handles reading and creating a preliminary sample and discovers the meta data -""" -class Reader: - def __init__(self): - self.nrows = 0 - self.xchar = None - - def row_count(self): - content = self.read() - return np.sum([1 for row in content]) - """ - This function determines the most common delimiter from a subset of possible delimiters. It uses a statistical approach to guage the distribution of columns for a given delimiter - """ - def delimiter(self,sample): - - m = {',':[],'\t':[],'|':[],'\x3A':[]} - delim = m.keys() - for row in sample: - for xchar in delim: - if row.split(xchar) > 1: - m[xchar].append(len(row.split(xchar))) - else: - m[xchar].append(0) - - - - # - # The delimiter with the smallest variance, provided the mean is greater than 1 - # This would be troublesome if there many broken records sampled - # - m = {id: np.var(m[id]) for id in m.keys() if m[id] != [] and int(np.mean(m[id]))>1} - index = m.values().index( min(m.values())) - xchar = m.keys()[index] - - return xchar - """ - This function determines the number of columns of a given sample - @pre self.xchar is not None - """ - def col_count(self,sample): - - m = {} - i = 0 - - for row in sample: - row = self.format(row) - id = str(len(row)) - #id = str(len(row.split(self.xchar))) - - if id not in m: - m[id] = 0 - m[id] = m[id] + 1 - - index = m.values().index( max(m.values()) ) - ncols = int(m.keys()[index]) - - - return ncols; - """ - This function will clean records of a given row by removing non-ascii characters - @pre self.xchar is not None - """ - def format (self,row): - - if isinstance(row,list) == False: - # - # We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary) - cols = self.split(row) - #cols = row.split(self.xchar) - else: - cols = row ; - return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols] - - #if isinstance(row,list) == False: - # return (self.xchar.join(r)).format('utf-8') - #else: - # return r - """ - This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes. - @pre : self.xchar is not None - """ - def split (self,row): - - pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"]) - return re.findall(pattern,row.replace('\n','')) - -class Writer: - - def format(self,row,xchar): - if xchar is not None and isinstance(row,list): - return xchar.join(row)+'\n' - elif xchar is None and isinstance(row,dict): - row = json.dumps(row) - return row - """ - It is important to be able to archive data so as to insure that growth is controlled - Nothing in nature grows indefinitely neither should data being handled. - """ - def archive(self): - pass - def flush(self): - pass - -""" - This class is designed to read data from an Http request file handler provided to us by flask - The file will be heald in memory and processed accordingly - NOTE: This is inefficient and can crash a micro-instance (becareful) -""" -class HttpRequestReader(Reader): - def __init__(self,**params): - self.file_length = 0 - try: - - #self.file = params['file'] - #self.file.seek(0, os.SEEK_END) - #self.file_length = self.file.tell() - - #print 'size of file ',self.file_length - self.content = params['file'].readlines() - self.file_length = len(self.content) - except Exception, e: - print "Error ... ",e - pass - - def isready(self): - return self.file_length > 0 - def read(self,size =-1): - i = 1 - for row in self.content: - i += 1 - if size == i: - break - yield row - -""" - This class is designed to write data to a session/cookie -""" -class HttpSessionWriter(Writer): - """ - @param key required session key - """ - def __init__(self,**params): - self.session = params['queue'] - self.session['sql'] = [] - self.session['csv'] = [] - self.tablename = re.sub('..+$','',params['filename']) - self.session['uid'] = params['uid'] - #self.xchar = params['xchar'] - - - def format_sql(self,row): - values = "','".join([col.replace('"','').replace("'",'') for col in row]) - return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename) - def isready(self): - return True - def write(self,**params): - label = params['label'] - row = params ['row'] - - if label == 'usable': - self.session['csv'].append(self.format(row,',')) - self.session['sql'].append(self.format_sql(row)) - -""" - This class is designed to read data from disk (location on hard drive) - @pre : isready() == True -""" -class DiskReader(Reader) : - """ - @param path absolute path of the file to be read - """ - def __init__(self,**params): - Reader.__init__(self) - self.path = params['path'] ; - - def isready(self): - return os.path.exists(self.path) - """ - This function reads the rows from a designated location on disk - @param size number of rows to be read, -1 suggests all rows - """ - def read(self,size=-1): - f = open(self.path,'rU') - i = 1 - for row in f: - - i += 1 - if size == i: - break - yield row - f.close() -""" - This function writes output to disk in a designated location -""" -class DiskWriter(Writer): - def __init__(self,**params): - if 'path' in params: - self.path = params['path'] - else: - self.path = None - if 'name' in params: - self.name = params['name']; - else: - self.name = None - if os.path.exists(self.path) == False: - os.mkdir(self.path) - """ - This function determines if the class is ready for execution or not - i.e it determines if the preconditions of met prior execution - """ - def isready(self): - - p = self.path is not None and os.path.exists(self.path) - q = self.name is not None - return p and q - """ - This function writes a record to a designated file - @param label - @param row row to be written - """ - def write(self,**params): - label = params['label'] - row = params['row'] - xchar = None - if 'xchar' is not None: - xchar = params['xchar'] - path = ''.join([self.path,os.sep,label]) - if os.path.exists(path) == False: - os.mkdir(path) ; - path = ''.join([path,os.sep,self.name]) - f = open(path,'a') - row = self.format(row,xchar); - f.write(row) - f.close() -""" - This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq) -""" -class MessageQueue: - def __init__(self,**params): - self.host= params['host'] - self.uid = params['uid'] - self.qid = params['qid'] - - def isready(self): - #self.init() - resp = self.connection is not None and self.connection.is_open - self.close() - return resp - def close(self): - self.channel.close() - self.connection.close() -""" - This class is designed to publish content to an AMQP (Rabbitmq) - The class will rely on pika to implement this functionality - - We will publish information to a given queue for a given exchange -""" - -class QueueWriter(MessageQueue,Writer): - def __init__(self,**params): - #self.host= params['host'] - #self.uid = params['uid'] - #self.qid = params['queue'] - MessageQueue.__init__(self,**params); - - - def init(self,label=None): - properties = pika.ConnectionParameters(host=self.host) - self.connection = pika.BlockingConnection(properties) - self.channel = self.connection.channel() - self.info = self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True) - if label is None: - self.qhandler = self.channel.queue_declare(queue=self.qid,durable=True) - else: - self.qhandler = self.channel.queue_declare(queue=label,durable=True) - - self.channel.queue_bind(exchange=self.uid,queue=self.qhandler.method.queue) - - - - """ - This function writes a stream of data to the a given queue - @param object object to be written (will be converted to JSON) - @TODO: make this less chatty - """ - def write(self,**params): - xchar = None - if 'xchar' in params: - xchar = params['xchar'] - object = self.format(params['row'],xchar) - - label = params['label'] - self.init(label) - _mode = 2 - if isinstance(object,str): - stream = object - _type = 'text/plain' - else: - stream = json.dumps(object) - if 'type' in params : - _type = params['type'] - else: - _type = 'application/json' - - self.channel.basic_publish( - exchange=self.uid, - routing_key=label, - body=stream, - properties=pika.BasicProperties(content_type=_type,delivery_mode=_mode) - ); - self.close() - - def flush(self,label): - self.init(label) - _mode = 1 #-- Non persistent - self.channel.queue_delete( queue=label); - self.close() - -""" - This class will read from a queue provided an exchange, queue and host - @TODO: Account for security and virtualhosts -""" -class QueueReader(MessageQueue,Reader): - """ - @param host host - @param uid exchange identifier - @param qid queue identifier - """ - def __init__(self,**params): - #self.host= params['host'] - #self.uid = params['uid'] - #self.qid = params['qid'] - MessageQueue.__init__(self,**params); - if 'durable' in params : - self.durable = True - else: - self.durable = False - self.size = -1 - self.data = {} - - def init(self,qid): - - properties = pika.ConnectionParameters(host=self.host) - self.connection = pika.BlockingConnection(properties) - self.channel = self.connection.channel() - self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True) - - self.info = self.channel.queue_declare(queue=qid,durable=True) - - - - """ - This is the callback function designed to process the data stream from the queue - - """ - def callback(self,channel,method,header,stream): - r = [] - if re.match("^\{|\[",stream) is not None: - r = json.loads(stream) - else: - - r = stream - - qid = self.info.method.queue - if qid not in self.data : - self.data[qid] = [] - - self.data[qid].append(r) - # - # We stop reading when the all the messages of the queue are staked - # - if self.size == len(self.data[qid]) or len(self.data[qid]) == self.info.method.message_count: - self.close() - - """ - This function will read, the first message from a queue - @TODO: - Implement channel.basic_get in order to retrieve a single message at a time - Have the number of messages retrieved be specified by size (parameter) - """ - def read(self,size=-1): - r = {} - self.size = size - # - # We enabled the reader to be able to read from several queues (sequentially for now) - # The qid parameter will be an array of queues the reader will be reading from - # - for qid in self.qid: - self.init(qid) - # r[qid] = [] - if self.info.method.message_count > 0: - - self.channel.basic_consume(self.callback,queue=qid,no_ack=False); - self.channel.start_consuming() - else: - - pass - #self.close() - # r[qid].append( self.data) - - return self.data -class QueueListener(QueueReader): - def init(self,qid): - properties = pika.ConnectionParameters(host=self.host) - self.connection = pika.BlockingConnection(properties) - self.channel = self.connection.channel() - self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True ) - - self.info = self.channel.queue_declare(exclusive=True,queue=qid) - print self.info.method.queue - self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid) - #self.callback = callback - def read(self): - - self.init(self.qid) - self.channel.basic_consume(self.callback,queue=self.qid,no_ack=True); - self.channel.start_consuming() - -""" - This class is designed to write output as sql insert statements - The class will inherit from DiskWriter with minor adjustments - @TODO: Include script to create the table if need be using the upper bound of a learner -""" -class SQLDiskWriter(DiskWriter): - def __init__(self,**args): - DiskWriter.__init__(self,**args) - self.tablename = re.sub('\..+$','',self.name).replace(' ','_') - """ - @param label - @param row - @param xchar - """ - def write(self,**args): - label = args['label'] - row = args['row'] - - if label == 'usable': - values = "','".join([col.replace('"','').replace("'",'') for col in row]) - row = "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename) - - args['row'] = row - DiskWriter.write(self,**args) -class Couchdb: - """ - @param uri host & port reference - @param uid user id involved - - @param dbname database name (target) - """ - def __init__(self,**args): - uri = args['uri'] - self.uid = args['uid'] - dbname = args['dbname'] - self.server = Server(uri=uri) - self.dbase = self.server.get_db(dbname) - if self.dbase.doc_exist(self.uid) == False: - self.dbase.save_doc({"_id":self.uid}) - """ - Insuring the preconditions are met for processing - """ - def isready(self): - p = self.server.info() != {} - if p == False or self.dbase.dbname not in self.server.all_dbs(): - return False - # - # At this point we are sure that the server is connected - # We are also sure that the database actually exists - # - q = self.dbase.doc_exist(self.uid) - if q == False: - return False - return True - -""" - This function will read an attachment from couchdb and return it to calling code. The attachment must have been placed before hand (otherwise oops) - @T: Account for security & access control -""" -class CouchdbReader(Couchdb,Reader): - """ - @param filename filename (attachment) - """ - def __init__(self,**args): - # - # setting the basic parameters for - Couchdb.__init__(self,**args) - if 'filename' in args : - self.filename = args['filename'] - else: - self.filename = None - - def isready(self): - # - # Is the basic information about the database valid - # - p = Couchdb.isready(self) - - if p == False: - return False - # - # The database name is set and correct at this point - # We insure the document of the given user has the requested attachment. - # - - doc = self.dbase.get(self.uid) - - if '_attachments' in doc: - r = self.filename in doc['_attachments'].keys() - - else: - r = False - - return r - def stream(self): - content = self.dbase.fetch_attachment(self.uid,self.filename).split('\n') ; - i = 1 - for row in content: - yield row - if size > 0 and i == size: - break - i = i + 1 - - def read(self,size=-1): - if self.filename is not None: - self.stream() - else: - return self.basic_read() - def basic_read(self): - document = self.dbase.get(self.uid) - del document['_id'], document['_rev'] - return document -""" - This class will write on a couchdb document provided a scope - The scope is the attribute that will be on the couchdb document -""" -class CouchdbWriter(Couchdb,Writer): - """ - @param uri host & port reference - @param uid user id involved - @param filename filename (attachment) - @param dbname database name (target) - """ - def __init__(self,**args): - uri = args['uri'] - self.uid = args['uid'] - if 'filename' in args: - self.filename = args['filename'] - else: - self.filename = None - dbname = args['dbname'] - self.server = Server(uri=uri) - self.dbase = self.server.get_db(dbname) - # - # If the document doesn't exist then we should create it - # - - """ - write a given attribute to a document database - @param label scope of the row repair|broken|fixed|stats - @param row row to be written - """ - def write(self,**params): - - document = self.dbase.get(self.uid) - label = params['label'] - row = params['row'] - if label not in document : - document[label] = [] - document[label].append(row) - self.dbase.save_doc(document) - def flush(self,**params) : - - size = params['size'] - has_changed = False - document = self.dbase.get(self.uid) - for key in document: - if key not in ['_id','_rev','_attachments'] : - content = document[key] - else: - continue - if isinstance(content,list): - index = len(content) - size - content = content[index:] - document[key] = content - - else: - document[key] = {} - has_changed = True - if has_changed: - self.dbase.save_doc(document) - - def archive(self,params=None): - document = self.dbase.get(self.uid) - content = {} - _doc = {} - for id in document: - if id in ['_id','_rev','_attachments'] : - _doc[id] = document[id] - else: - content[id] = document[id] - - content = json.dumps(content) - document= _doc - now = str(datetime.today()) - - name = '-'.join([document['_id'] , now,'.json']) - self.dbase.save_doc(document) - self.dbase.put_attachment(document,content,name,'application/json') -""" - This class acts as a factory to be able to generate an instance of a Reader/Writer - Against a Queue,Disk,Cloud,Couchdb - The class doesn't enforce parameter validation, thus any error with the parameters sent will result in a null Object -""" -class DataSourceFactory: - def instance(self,**args): - source = args['type'] - params = args['args'] - anObject = None - - if source in ['HttpRequestReader','HttpSessionWriter']: - # - # @TODO: Make sure objects are serializable, be smart about them !! - # - aClassName = ''.join([source,'(**params)']) - - - else: - - stream = json.dumps(params) - aClassName = ''.join([source,'(**',stream,')']) - try: - - - anObject = eval( aClassName) - #setattr(anObject,'name',source) - except Exception,e: - print ['Error ',e] - return anObject -""" - This class implements a data-source handler that is intended to be used within the context of data processing, it allows to read/write anywhere transparently. - The class is a facade to a heterogeneous class hierarchy and thus simplifies how the calling code interacts with the class hierarchy -""" -class DataSource: - def __init__(self,sourceType='Disk',outputType='Disk',params={}): - self.Input = DataSourceFactory.instance(type=sourceType,args=params) - self.Output= DataSourceFactory.instance(type=outputType,args=params) - def read(self,size=-1): - return self.Input.read(size) - def write(self,**args): - self.Output.write(**args) -#p = {} -#p['host'] = 'dev.the-phi.com' -#p['uid'] = 'nyemba@gmail.com' -#p['qid'] = 'repair' -#factory = DataSourceFactory() -#o = factory.instance(type='QueueReader',args=p) -#print o is None -#q = QueueWriter(host='dev.the-phi.com',uid='nyemba@gmail.com') -#q.write(object='steve') -#q.write(object='nyemba') -#q.write(object='elon') - - diff --git a/src/utils/transport.pyc b/src/utils/transport.pyc index 20a22bb617c1498a7fcf5d20df38bd82ea28d12b..4e8415efefab1796109913b1bdd447c521166c27 100644 GIT binary patch delta 17 ZcmZ3qoN>`|MmFZpyj=Dc8``|MmFZpyj;snH?qwM1^_vn1}Ojl