diff --git a/setup.py b/setup.py index 67ecfc4..dd4b292 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = { "name":"data-transport", - "version":"1.5.0", + "version":"1.5.2", "author":"The Phi Technology LLC","author_email":"info@the-phi.com", "license":"MIT", "packages":["transport"]} diff --git a/transport/__init__.py b/transport/__init__.py index 6822138..86d7fce 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -57,25 +57,27 @@ import os class factory : TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}} PROVIDERS = { - "etl":{"class":{"read":etl.instance}}, + "etl":{"class":{"read":etl.instance,"write":etl.instance}}, "console":{"class":{"write":Console,"read":Console}}, "file":{"class":{"read":disk.DiskReader,"write":disk.DiskWriter}}, "sqlite":{"class":{"read":disk.SQLiteReader,"write":disk.SQLiteWriter}}, - "postgresql":{"port":5432,"host":"localhost","database":os.environ['USER'],"driver":pg,"default":{"type":"VARCHAR"}}, - "redshift":{"port":5432,"host":"localhost","database":os.environ['USER'],"driver":pg,"default":{"type":"VARCHAR"}}, + "postgresql":{"port":5432,"host":"localhost","database":os.environ['USER'],"driver":pg,"default":{"type":"VARCHAR"},"class":{"read":sql.SQLReader,"write":sql.SQLWriter}}, + "redshift":{"port":5432,"host":"localhost","database":os.environ['USER'],"driver":pg,"default":{"type":"VARCHAR"},"class":{"read":sql.SQLReader,"write":sql.SQLWriter}}, "bigquery":{"class":{"read":sql.BQReader,"write":sql.BQWriter}}, - "mysql":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"},"driver":my}, - "mariadb":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"},"driver":my}, + "mysql":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"},"driver":my,"class":{"read":sql.SQLReader,"write":sql.SQLWriter}}, + "mariadb":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"},"driver":my,"class":{"read":sql.SQLReader,"write":sql.SQLWriter}}, "mongo":{"port":27017,"host":"localhost","class":{"read":mongo.MongoReader,"write":mongo.MongoWriter}}, "couch":{"port":5984,"host":"localhost","class":{"read":couch.CouchReader,"write":couch.CouchWriter}}, - "netezza":{"port":5480,"driver":nz,"default":{"type":"VARCHAR(256)"}}, - "rabbitmq":{"port":5672,"host":"localhost","class":{"read":queue.QueueReader,"write":queue.QueueWriter,"listen":queue.QueueListener},"default":{"type":"application/json"}}} + "netezza":{"port":5480,"driver":nz,"default":{"type":"VARCHAR(256)"},"class":{"read":sql.SQLReader,"write":sql.SQLWriter}}, + "rabbitmq":{"port":5672,"host":"localhost","class":{"read":queue.QueueReader,"write":queue.QueueWriter,"listen":queue.QueueListener,"listener":queue.QueueListener},"default":{"type":"application/json"}}} # # creating synonyms PROVIDERS['mongodb'] = PROVIDERS['mongo'] PROVIDERS['couchdb'] = PROVIDERS['couch'] PROVIDERS['bq'] = PROVIDERS['bigquery'] PROVIDERS['sqlite3'] = PROVIDERS['sqlite'] + PROVIDERS['rabbit'] = PROVIDERS['rabbitmq'] + PROVIDERS['rabbitmq-server'] = PROVIDERS['rabbitmq'] @staticmethod def instance(**_args): diff --git a/transport/common.py b/transport/common.py index a41e46b..e6578a6 100644 --- a/transport/common.py +++ b/transport/common.py @@ -113,32 +113,3 @@ class Console(Writer): if self.lock : Console.lock.release() -# class factory : -# @staticmethod -# def instance(**args): -# """ -# This class will create an instance of a transport when providing -# :type name of the type we are trying to create -# :args The arguments needed to create the instance -# """ -# source = args['type'] -# params = args['args'] -# anObject = None - -# if source in ['HttpRequestReader','HttpSessionWriter']: -# # -# # @TODO: Make sure objects are serializable, be smart about them !! -# # -# aClassName = ''.join([source,'(**params)']) - - -# else: - -# stream = json.dumps(params) -# aClassName = ''.join([source,'(**',stream,')']) -# try: -# anObject = eval( aClassName) -# #setattr(anObject,'name',source) -# except Exception,e: -# print ['Error ',e] -# return anObject \ No newline at end of file diff --git a/transport/mongo.py b/transport/mongo.py index fd2c5b8..8f593c3 100644 --- a/transport/mongo.py +++ b/transport/mongo.py @@ -41,11 +41,21 @@ class Mongo : self._lock = False if 'lock' not in args else args['lock'] - if 'user' in args and 'password' in args: + username = password = None + if 'username' in args and 'password' in args: + username = args['username'] + password=args['password'] + if 'auth_file' in args : + _info = json.loads((open(args['auth_file'])).read()) + username = _info['username'] + password = _info['password'] + authSource=(args['authSource'] if 'authSource' in args else self.dbname) + + if username and password : self.client = MongoClient(host, - username=args['username'] , - password=args['password'] , - authSource=(args['authSource'] if 'authSource' in args else self.dbname), + username=username, + password=password , + authSource=authSource, authMechanism='SCRAM-SHA-256') else: self.client = MongoClient(host,maxPoolSize=10000) diff --git a/transport/rabbitmq.py b/transport/rabbitmq.py index 68c5c5b..a56393b 100644 --- a/transport/rabbitmq.py +++ b/transport/rabbitmq.py @@ -16,7 +16,7 @@ if sys.version_info[0] > 2 : else: from common import Reader, Writer import json - +from multiprocessing import RLock class MessageQueue: """ This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq) @@ -29,12 +29,23 @@ class MessageQueue: self.port= 5672 if 'port' not in params else params['port'] self.virtual_host = '/' if 'vhost' not in params else params['vhost'] self.exchange = params['exchange'] if 'exchange' in params else 'amq.direct' #-- exchange - self.queue = params['queue'] + self.queue = params['queue'] if 'queue' in params else 'demo' self.connection = None self.channel = None - self.name = self.__class__.__name__.lower() if 'name' not in params else 'wtf' + self.name = self.__class__.__name__.lower() if 'name' not in params else params['name'] + username = password = None + if 'username' in params : + username = params['username'] + password = params['password'] + if 'auth_file' in params : + _info = json.loads((open(params['auth_file'])).read()) + username=_info['username'] + password=_info['password'] + self.virtual_host = _info['virtual_host'] if 'virtual_host' in _info else self.virtual_host + self.exchange = _info['exchange'] if 'exchange' in _info else self.exchange + self.queue = _info['queue'] if 'queue' in _info else self.queue self.credentials= pika.PlainCredentials('guest','guest') if 'username' in params : @@ -44,7 +55,9 @@ class MessageQueue: ) def init(self,label=None): - properties = pika.ConnectionParameters(host=self.host,port=self.port,virtual_host=self.virtual_host,credentials=self.credentials) + properties = pika.ConnectionParameters(host=self.host,port=self.port,virtual_host=self.virtual_host, + client_properties={'connection_name':self.name}, + credentials=self.credentials) self.connection = pika.BlockingConnection(properties) self.channel = self.connection.channel() self.info = self.channel.exchange_declare(exchange=self.exchange,exchange_type='direct',durable=True) @@ -93,23 +106,7 @@ class QueueWriter(MessageQueue,Writer): @param object object to be written (will be converted to JSON) @TODO: make this less chatty """ - # xchar = None - # if 'xchar' in params: - # xchar = params['xchar'] - # object = self.format(params['row'],xchar) - - # label = params['label'] - # self.init(label) - # _mode = 2 - # if isinstance(object,str): - # stream = object - # _type = 'text/plain' - # else: - # stream = json.dumps(object) - # if 'type' in params : - # _type = params['type'] - # else: - # _type = 'application/json' + stream = json.dumps(data) if isinstance(data,dict) else data self.channel.basic_publish( exchange=self.exchange, @@ -143,10 +140,11 @@ class QueueReader(MessageQueue,Reader): #self.queue = params['qid'] MessageQueue.__init__(self,**params); # self.init() - if 'durable' in params : - self.durable = True - else: - self.durable = False + self.durable = False if 'durable' not in params else params['durable'] + # if 'durable' in params : + # self.durable = True + # else: + # self.durable = False self.size = -1 self.data = {} # def init(self,qid): @@ -166,7 +164,8 @@ class QueueReader(MessageQueue,Reader): """ r = [] - if re.match("^\{|\[",stream) is not None: + # if re.match("^\{|\[",stream) is not None: + if stream.startswith(b'{') or stream.startswith(b'['): r = json.loads(stream) else: @@ -215,6 +214,7 @@ class QueueReader(MessageQueue,Reader): return self.data class QueueListener(MessageQueue): + lock = RLock() """ This class is designed to have an active listener (worker) against a specified Exchange/Queue It is initialized as would any other object and will require a callback function to address the objects returned. @@ -223,6 +223,7 @@ class QueueListener(MessageQueue): MessageQueue.__init__(self,**args) self.listen = self.read self.apply = args['apply'] if 'apply' in args else print + self.lock = False if 'lock' not in args else args['lock'] def finalize(self,channel,ExceptionReason): pass @@ -231,12 +232,30 @@ class QueueListener(MessageQueue): _info= {} # if re.match("^\{|\[",stream) is not None: + if stream.startswith(b"[") or stream.startswith(b"{"): _info = json.loads(stream) else: _info = stream - self.apply(_info) + # + # At this point we should invoke the apply function with a lock if need be + # @TODO: Establish a vocabulary + + if stream == b'QUIT' : + # channel.exit() + self.close() + if self.lock == True : + QueueListener.lock.acquire() + try: + # + # In case the user has not specified a function to apply the data against, it will simply be printed + # + self.apply(_info) + except Exception as e: + pass + if self.lock == True : + QueueListener.lock.release() def read(self): self.init(self.queue) @@ -246,3 +265,15 @@ class QueueListener(MessageQueue): +class Factory : + @staticmethod + def instance(**_args): + """ + :param count number of workers + :param apply function workers + """ + _apply = _args['apply'] + _count = _args['count'] + for i in np.arange(_count) : + _name = _args['name'] if 'name' in _args else 'worker_'+str(i) + transport.factory.instance(provider="rabbit",context="listener",apply=_apply,auth_file=_args['auth_file']) \ No newline at end of file diff --git a/transport/session.py b/transport/session.py index 5ca833a..915d2b5 100644 --- a/transport/session.py +++ b/transport/session.py @@ -5,11 +5,11 @@ from common import Reader, Writer import json class HttpRequestReader(Reader): - """ - This class is designed to read data from an Http request file handler provided to us by flask - The file will be heald in memory and processed accordingly - NOTE: This is inefficient and can crash a micro-instance (becareful) - """ + """ + This class is designed to read data from an Http request file handler provided to us by flask + The file will be heald in memory and processed accordingly + NOTE: This is inefficient and can crash a micro-instance (becareful) + """ def __init__(self,**params): self.file_length = 0 @@ -22,8 +22,8 @@ class HttpRequestReader(Reader): #print 'size of file ',self.file_length self.content = params['file'].readlines() self.file_length = len(self.content) - except Exception, e: - print "Error ... ",e + except Exception as e: + print ("Error ... ",e) pass def isready(self): @@ -37,13 +37,13 @@ class HttpRequestReader(Reader): yield row class HttpSessionWriter(Writer): - """ - This class is designed to write data to a session/cookie - """ + """ + This class is designed to write data to a session/cookie + """ def __init__(self,**params): - """ - @param key required session key - """ + """ + @param key required session key + """ self.session = params['queue'] self.session['sql'] = [] self.session['csv'] = [] diff --git a/transport/sql.py b/transport/sql.py index 52a676c..d2b0b36 100644 --- a/transport/sql.py +++ b/transport/sql.py @@ -64,6 +64,17 @@ class SQLRW : key = 'username' if 'username' in _args else 'user' _info['user'] = _args[key] _info['password'] = _args['password'] if 'password' in _args else '' + if 'auth_file' in _args : + _auth = json.loads( open(_args['auth_file']).read() ) + key = 'username' if 'username' in _auth else 'user' + _info['user'] = _auth[key] + _info['password'] = _auth['password'] if 'password' in _auth else '' + + _info['host'] = _auth['host'] if 'host' in _auth else _info['host'] + _info['port'] = _auth['port'] if 'port' in _auth else _info['port'] + if 'database' in _auth: + _info['dbname'] = _auth['database'] + self.table = _auth['table'] if 'table' in _auth else self.table # # We need to load the drivers here to see what we are dealing with ...