parent
b160d0a295
commit
0cf56f3e8f
@ -1,111 +0,0 @@
|
||||
"""
|
||||
This file implements databricks handling, This functionality will rely on databricks-sql-connector
|
||||
LICENSE (MIT)
|
||||
Copyright 2016-2020, The Phi Technology LLC
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
||||
|
||||
@TODO:
|
||||
- Migrate SQLite to SQL hierarchy
|
||||
- Include Write in Chunks from pandas
|
||||
"""
|
||||
import os
|
||||
import sqlalchemy
|
||||
from transport.common import Reader,Writer
|
||||
import pandas as pd
|
||||
|
||||
|
||||
class Bricks:
|
||||
"""
|
||||
:host
|
||||
:token
|
||||
:database
|
||||
:cluster_path
|
||||
:table
|
||||
"""
|
||||
def __init__(self,**_args):
|
||||
_host = _args['host']
|
||||
_token= _args['token']
|
||||
_cluster_path = _args['cluster_path']
|
||||
self._schema = _args['schema'] if 'schema' in _args else _args['database']
|
||||
_catalog = _args['catalog']
|
||||
self._table = _args['table'] if 'table' in _args else None
|
||||
|
||||
#
|
||||
# @TODO:
|
||||
# Sometimes when the cluster isn't up and running it takes a while, the user should be alerted of this
|
||||
#
|
||||
|
||||
_uri = f'''databricks://token:{_token}@{_host}?http_path={_cluster_path}&catalog={_catalog}&schema={self._schema}'''
|
||||
self._engine = sqlalchemy.create_engine (_uri)
|
||||
pass
|
||||
def meta(self,**_args):
|
||||
table = _args['table'] if 'table' in _args else self._table
|
||||
if not table :
|
||||
return []
|
||||
else:
|
||||
if sqlalchemy.__version__.startswith('1.') :
|
||||
_m = sqlalchemy.MetaData(bind=self._engine)
|
||||
_m.reflect(only=[table])
|
||||
else:
|
||||
_m = sqlalchemy.MetaData()
|
||||
_m.reflect(bind=self._engine)
|
||||
#
|
||||
# Let's retrieve te information associated with a table
|
||||
#
|
||||
return [{'name':_attr.name,'type':_attr.type} for _attr in _m.tables[table].columns]
|
||||
|
||||
def has(self,**_args):
|
||||
return self.meta(**_args)
|
||||
def apply(self,_sql):
|
||||
try:
|
||||
if _sql.lower().startswith('select') :
|
||||
return pd.read_sql(_sql,self._engine)
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
class BricksReader(Bricks,Reader):
|
||||
"""
|
||||
This class is designed for reads and will execute reads against a table name or a select SQL statement
|
||||
"""
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
def read(self,**_args):
|
||||
limit = None if 'limit' not in _args else str(_args['limit'])
|
||||
|
||||
if 'sql' in _args :
|
||||
sql = _args['sql']
|
||||
elif 'table' in _args :
|
||||
table = _args['table']
|
||||
sql = f'SELECT * FROM {table}'
|
||||
if limit :
|
||||
sql = sql + f' LIMIT {limit}'
|
||||
|
||||
if 'sql' in _args or 'table' in _args :
|
||||
return self.apply(sql)
|
||||
else:
|
||||
return pd.DataFrame()
|
||||
pass
|
||||
class BricksWriter(Bricks,Writer):
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
def write(self,_data,**_args):
|
||||
"""
|
||||
This data will write data to data-bricks against a given table. If the table is not specified upon initiazation, it can be specified here
|
||||
_data: data frame to push to databricks
|
||||
_args: chunks, table, schema
|
||||
"""
|
||||
_schema = self._schema if 'schema' not in _args else _args['schema']
|
||||
_table = self._table if 'table' not in _args else _args['table']
|
||||
_df = _data if type(_data) == pd.DataFrame else _data
|
||||
if type(_df) == dict :
|
||||
_df = [_df]
|
||||
if type(_df) == list :
|
||||
_df = pd.DataFrame(_df)
|
||||
_df.to_sql(
|
||||
name=_table,schema=_schema,
|
||||
con=self._engine,if_exists='append',index=False);
|
||||
pass
|
@ -1,151 +0,0 @@
|
||||
"""
|
||||
Data Transport - 1.0
|
||||
Steve L. Nyemba, The Phi Technology LLC
|
||||
|
||||
This module is designed to serve as a wrapper to a set of supported data stores :
|
||||
- couchdb
|
||||
- mongodb
|
||||
- Files (character delimited)
|
||||
- Queues (Rabbmitmq)
|
||||
- Session (Flask)
|
||||
- s3
|
||||
The supported operations are read/write and providing meta data to the calling code
|
||||
Requirements :
|
||||
pymongo
|
||||
boto
|
||||
couldant
|
||||
@TODO:
|
||||
Enable read/writing to multiple reads/writes
|
||||
"""
|
||||
__author__ = 'The Phi Technology'
|
||||
import numpy as np
|
||||
import json
|
||||
import importlib
|
||||
from multiprocessing import RLock
|
||||
import queue
|
||||
# import couch
|
||||
# import mongo
|
||||
from datetime import datetime
|
||||
|
||||
class IO:
|
||||
def init(self,**args):
|
||||
"""
|
||||
This function enables attributes to be changed at runtime. Only the attributes defined in the class can be changed
|
||||
Adding attributes will require sub-classing otherwise we may have an unpredictable class ...
|
||||
"""
|
||||
allowed = list(vars(self).keys())
|
||||
for field in args :
|
||||
if field not in allowed :
|
||||
continue
|
||||
value = args[field]
|
||||
setattr(self,field,value)
|
||||
class IEncoder (json.JSONEncoder):
|
||||
def default (self,object):
|
||||
if type(object) == np.integer :
|
||||
return int(object)
|
||||
elif type(object) == np.floating:
|
||||
return float(object)
|
||||
elif type(object) == np.ndarray :
|
||||
return object.tolist()
|
||||
elif type(object) == datetime :
|
||||
return object.isoformat()
|
||||
else:
|
||||
return super(IEncoder,self).default(object)
|
||||
|
||||
class Reader (IO):
|
||||
"""
|
||||
This class is an abstraction of a read functionalities of a data store
|
||||
"""
|
||||
def __init__(self):
|
||||
pass
|
||||
def meta(self,**_args):
|
||||
"""
|
||||
This function is intended to return meta-data associated with what has just been read
|
||||
@return object of meta data information associated with the content of the store
|
||||
"""
|
||||
raise Exception ("meta function needs to be implemented")
|
||||
def read(self,**args):
|
||||
"""
|
||||
This function is intended to read the content of a store provided parameters to be used at the discretion of the subclass
|
||||
"""
|
||||
raise Exception ("read function needs to be implemented")
|
||||
|
||||
|
||||
class Writer(IO):
|
||||
def __init__(self):
|
||||
self.cache = {"default":[]}
|
||||
def log(self,**args):
|
||||
self.cache[id] = args
|
||||
def meta (self,id="default",**args):
|
||||
raise Exception ("meta function needs to be implemented")
|
||||
def format(self,row,xchar):
|
||||
if xchar is not None and isinstance(row,list):
|
||||
return xchar.join(row)+'\n'
|
||||
elif xchar is None and isinstance(row,dict):
|
||||
row = json.dumps(row)
|
||||
return row
|
||||
def write(self,**args):
|
||||
"""
|
||||
This function will write content to a store given parameters to be used at the discretion of the sub-class
|
||||
"""
|
||||
raise Exception ("write function needs to be implemented")
|
||||
|
||||
def archive(self):
|
||||
"""
|
||||
It is important to be able to archive data so as to insure that growth is controlled
|
||||
Nothing in nature grows indefinitely neither should data being handled.
|
||||
"""
|
||||
raise Exception ("archive function needs to be implemented")
|
||||
def close(self):
|
||||
"""
|
||||
This function will close the persistent storage connection/handler
|
||||
"""
|
||||
pass
|
||||
class ReadWriter(Reader,Writer) :
|
||||
"""
|
||||
This class implements the read/write functions aggregated
|
||||
"""
|
||||
pass
|
||||
# class Console(Writer):
|
||||
# lock = RLock()
|
||||
# def __init__(self,**_args):
|
||||
# self.lock = _args['lock'] if 'lock' in _args else False
|
||||
# self.info = self.write
|
||||
# self.debug = self.write
|
||||
# self.log = self.write
|
||||
# pass
|
||||
# def write (self,logs=None,**_args):
|
||||
# if self.lock :
|
||||
# Console.lock.acquire()
|
||||
# try:
|
||||
# _params = _args if logs is None and _args else logs
|
||||
# if type(_params) == list:
|
||||
# for row in _params :
|
||||
# print (row)
|
||||
# else:
|
||||
# print (_params)
|
||||
# except Exception as e :
|
||||
# print (e)
|
||||
# finally:
|
||||
# if self.lock :
|
||||
# Console.lock.release()
|
||||
|
||||
|
||||
"""
|
||||
@NOTE : Experimental !!
|
||||
"""
|
||||
class Proxy :
|
||||
"""
|
||||
This class will forward a call to a function that is provided by the user code
|
||||
"""
|
||||
def __init__(self,**_args):
|
||||
self.callback = _args['callback']
|
||||
def read(self,**_args) :
|
||||
try:
|
||||
return self.callback(**_args)
|
||||
except Exception as e:
|
||||
return self.callback()
|
||||
|
||||
pass
|
||||
def write(self,data,**_args):
|
||||
self.callback(data,**_args)
|
@ -1,269 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
|
||||
if sys.version_info[0] > 2 :
|
||||
from transport.common import Reader, Writer #, factory
|
||||
else:
|
||||
from common import Reader,Writer
|
||||
# import nujson as json
|
||||
import json
|
||||
# from threading import Lock
|
||||
import sqlite3
|
||||
import pandas as pd
|
||||
from multiprocessing import Lock
|
||||
from transport.common import Reader, Writer, IEncoder
|
||||
import sqlalchemy
|
||||
from sqlalchemy import create_engine
|
||||
class DiskReader(Reader) :
|
||||
"""
|
||||
This class is designed to read data from disk (location on hard drive)
|
||||
@pre : isready() == True
|
||||
"""
|
||||
|
||||
def __init__(self,**params):
|
||||
"""
|
||||
|
||||
@param path absolute path of the file to be read
|
||||
"""
|
||||
|
||||
Reader.__init__(self)
|
||||
self.path = params['path'] if 'path' in params else None
|
||||
self.delimiter = params['delimiter'] if 'delimiter' in params else ','
|
||||
|
||||
def isready(self):
|
||||
return os.path.exists(self.path)
|
||||
def meta(self,**_args):
|
||||
return []
|
||||
def read(self,**args):
|
||||
_path = self.path if 'path' not in args else args['path']
|
||||
_delimiter = self.delimiter if 'delimiter' not in args else args['delimiter']
|
||||
return pd.read_csv(_path,delimiter=self.delimiter)
|
||||
def stream(self,**args):
|
||||
"""
|
||||
This function reads the rows from a designated location on disk
|
||||
@param size number of rows to be read, -1 suggests all rows
|
||||
"""
|
||||
|
||||
size = -1 if 'size' not in args else int(args['size'])
|
||||
f = open(self.path,'rU')
|
||||
i = 1
|
||||
for row in f:
|
||||
|
||||
i += 1
|
||||
if size == i:
|
||||
break
|
||||
if self.delimiter :
|
||||
yield row.split(self.delimiter)
|
||||
yield row
|
||||
f.close()
|
||||
class DiskWriter(Writer):
|
||||
|
||||
"""
|
||||
This function writes output to disk in a designated location. The function will write a text to a text file
|
||||
- If a delimiter is provided it will use that to generate a xchar-delimited file
|
||||
- If not then the object will be dumped as is
|
||||
"""
|
||||
THREAD_LOCK = Lock()
|
||||
def __init__(self,**params):
|
||||
super().__init__()
|
||||
self._path = params['path']
|
||||
self._delimiter = params['delimiter'] if 'delimiter' in params else None
|
||||
self._mode = 'w' if 'mode' not in params else params['mode']
|
||||
# def meta(self):
|
||||
# return self.cache['meta']
|
||||
# def isready(self):
|
||||
# """
|
||||
# This function determines if the class is ready for execution or not
|
||||
# i.e it determines if the preconditions of met prior execution
|
||||
# """
|
||||
# return True
|
||||
# # p = self.path is not None and os.path.exists(self.path)
|
||||
# # q = self.name is not None
|
||||
# # return p and q
|
||||
# def format (self,row):
|
||||
# self.cache['meta']['cols'] += len(row) if isinstance(row,list) else len(row.keys())
|
||||
# self.cache['meta']['rows'] += 1
|
||||
# return (self.delimiter.join(row) if self.delimiter else json.dumps(row))+"\n"
|
||||
def write(self,info,**_args):
|
||||
"""
|
||||
This function writes a record to a designated file
|
||||
@param label <passed|broken|fixed|stats>
|
||||
@param row row to be written
|
||||
"""
|
||||
try:
|
||||
|
||||
|
||||
DiskWriter.THREAD_LOCK.acquire()
|
||||
|
||||
_delim = self._delimiter if 'delimiter' not in _args else _args['delimiter']
|
||||
_path = self._path if 'path' not in _args else _args['path']
|
||||
_mode = self._mode if 'mode' not in _args else _args['mode']
|
||||
info.to_csv(_path,index=False,sep=_delim)
|
||||
pass
|
||||
except Exception as e:
|
||||
#
|
||||
# Not sure what should be done here ...
|
||||
pass
|
||||
finally:
|
||||
DiskWriter.THREAD_LOCK.release()
|
||||
class SQLite :
|
||||
def __init__(self,**_args) :
|
||||
self.path = _args['database'] if 'database' in _args else _args['path']
|
||||
self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE")
|
||||
self.conn.row_factory = sqlite3.Row
|
||||
self.fields = _args['fields'] if 'fields' in _args else []
|
||||
def has (self,**_args):
|
||||
found = False
|
||||
try:
|
||||
if 'table' in _args :
|
||||
table = _args['table']
|
||||
sql = "SELECT * FROM :table limit 1".replace(":table",table)
|
||||
_df = pd.read_sql(sql,self.conn)
|
||||
found = _df.columns.size > 0
|
||||
except Exception as e:
|
||||
pass
|
||||
return found
|
||||
def close(self):
|
||||
try:
|
||||
self.conn.close()
|
||||
except Exception as e :
|
||||
print(e)
|
||||
def apply(self,sql):
|
||||
try:
|
||||
if not sql.lower().startswith('select'):
|
||||
cursor = self.conn.cursor()
|
||||
cursor.execute(sql)
|
||||
cursor.close()
|
||||
self.conn.commit()
|
||||
else:
|
||||
return pd.read_sql(sql,self.conn)
|
||||
except Exception as e:
|
||||
print (e)
|
||||
class SQLiteReader (SQLite,DiskReader):
|
||||
def __init__(self,**args):
|
||||
super().__init__(**args)
|
||||
# DiskReader.__init__(self,**args)
|
||||
# self.path = args['database'] if 'database' in args else args['path']
|
||||
# self.conn = sqlite3.connect(self.path,isolation_level=None)
|
||||
# self.conn.row_factory = sqlite3.Row
|
||||
self.table = args['table'] if 'table' in args else None
|
||||
def read(self,**args):
|
||||
if 'sql' in args :
|
||||
sql = args['sql']
|
||||
elif 'filter' in args :
|
||||
sql = "SELECT :fields FROM ",self.table, "WHERE (:filter)".replace(":filter",args['filter'])
|
||||
sql = sql.replace(":fields",args['fields']) if 'fields' in args else sql.replace(":fields","*")
|
||||
else:
|
||||
sql = ' '.join(['SELECT * FROM ',self.table])
|
||||
if 'limit' in args :
|
||||
sql = sql + " LIMIT "+args['limit']
|
||||
return pd.read_sql(sql,self.conn)
|
||||
def close(self):
|
||||
try:
|
||||
self.conn.close()
|
||||
except Exception as e :
|
||||
pass
|
||||
|
||||
class SQLiteWriter(SQLite,DiskWriter) :
|
||||
connection = None
|
||||
LOCK = Lock()
|
||||
def __init__(self,**args):
|
||||
"""
|
||||
:path
|
||||
:fields json|csv
|
||||
"""
|
||||
# DiskWriter.__init__(self,**args)
|
||||
super().__init__(**args)
|
||||
self.table = args['table'] if 'table' in args else None
|
||||
path = self.path
|
||||
self._engine = create_engine(f'sqlite:///{path}')
|
||||
|
||||
# self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE")
|
||||
# self.conn.row_factory = sqlite3.Row
|
||||
# self.fields = args['fields'] if 'fields' in args else []
|
||||
|
||||
if self.fields and not self.isready() and self.table:
|
||||
self.init(self.fields)
|
||||
SQLiteWriter.connection = self.conn
|
||||
def init(self,fields):
|
||||
self.fields = fields;
|
||||
sql = " ".join(["CREATE TABLE IF NOT EXISTS ",self.table," (", ",".join(self.fields),")"])
|
||||
|
||||
cursor = self.conn.cursor()
|
||||
cursor.execute(sql)
|
||||
cursor.close()
|
||||
self.conn.commit()
|
||||
def isready(self):
|
||||
try:
|
||||
sql = "SELECT count(*) FROM sqlite_master where name=':table'"
|
||||
sql = sql.replace(":table",self.table)
|
||||
cursor = self.conn.cursor()
|
||||
|
||||
r = cursor.execute(sql)
|
||||
r = r.fetchall()
|
||||
cursor.close()
|
||||
|
||||
return r[0][0] != 0
|
||||
except Exception as e:
|
||||
pass
|
||||
return 0
|
||||
#
|
||||
# If the table doesn't exist we should create it
|
||||
#
|
||||
# def write(self,_data,**_args):
|
||||
# SQLiteWriter.LOCK.acquire()
|
||||
# try:
|
||||
# if type(_data) == dict :
|
||||
# _data = [_data]
|
||||
# _table = self.table if 'table' not in _args else _args['table']
|
||||
# _df = pd.DataFrame(_data)
|
||||
# _df.to_sql(_table,self._engine.connect(),if_exists='append',index=False)
|
||||
# except Exception as e:
|
||||
# print (e)
|
||||
# SQLiteWriter.LOCK.release()
|
||||
def write(self,info,**_args):
|
||||
"""
|
||||
"""
|
||||
|
||||
#if not self.fields :
|
||||
# #if type(info) == pd.DataFrame :
|
||||
# # _columns = list(info.columns)
|
||||
# #self.init(list(info.keys()))
|
||||
|
||||
if type(info) == dict :
|
||||
info = [info]
|
||||
elif type(info) == pd.DataFrame :
|
||||
info = info.fillna('')
|
||||
info = info.to_dict(orient='records')
|
||||
|
||||
if not self.fields :
|
||||
_rec = info[0]
|
||||
self.init(list(_rec.keys()))
|
||||
|
||||
SQLiteWriter.LOCK.acquire()
|
||||
try:
|
||||
|
||||
cursor = self.conn.cursor()
|
||||
sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(:values)"])
|
||||
for row in info :
|
||||
values = [ str(row[field]) if type(row[field]) not in [list,dict] else json.dumps(row[field],cls=IEncoder) for field in self.fields]
|
||||
values = ["".join(["'",value,"'"]) for value in values]
|
||||
|
||||
# stream =["".join(["",value,""]) if type(value) == str else value for value in row.values()]
|
||||
# stream = json.dumps(stream,cls=IEncoder)
|
||||
# stream = stream.replace("[","").replace("]","")
|
||||
|
||||
# print (sql.replace(":values",stream))
|
||||
# self.conn.execute(sql.replace(":values",stream) )
|
||||
self.conn.execute(sql.replace(":values", ",".join(values)) )
|
||||
# cursor.commit()
|
||||
|
||||
self.conn.commit()
|
||||
# print (sql)
|
||||
except Exception as e :
|
||||
print ()
|
||||
|
||||
print (e)
|
||||
pass
|
||||
SQLiteWriter.LOCK.release()
|
@ -1,279 +0,0 @@
|
||||
"""
|
||||
Data Transport - 1.0
|
||||
Steve L. Nyemba, The Phi Technology LLC
|
||||
|
||||
This file is a wrapper around rabbitmq server for reading and writing content to a queue (exchange)
|
||||
|
||||
"""
|
||||
import pika
|
||||
from datetime import datetime
|
||||
import re
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
if sys.version_info[0] > 2 :
|
||||
from transport.common import Reader, Writer
|
||||
else:
|
||||
from common import Reader, Writer
|
||||
import json
|
||||
from multiprocessing import RLock
|
||||
class MessageQueue:
|
||||
"""
|
||||
This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq)
|
||||
:host
|
||||
:xid identifier of the exchange
|
||||
:qid identifier of the queue
|
||||
"""
|
||||
def __init__(self,**params):
|
||||
self.host= 'localhost' if 'host' not in params else params['host'] #-- location of the queue server
|
||||
self.port= 5672 if 'port' not in params else params['port']
|
||||
self.virtual_host = '/' if 'vhost' not in params else params['vhost']
|
||||
self.exchange = params['exchange'] if 'exchange' in params else 'amq.direct' #-- exchange
|
||||
self.queue = params['queue'] if 'queue' in params else 'demo'
|
||||
self.connection = None
|
||||
self.channel = None
|
||||
|
||||
self.name = self.__class__.__name__.lower() if 'name' not in params else params['name']
|
||||
|
||||
username = password = None
|
||||
if 'username' in params :
|
||||
username = params['username']
|
||||
password = params['password']
|
||||
if 'auth_file' in params :
|
||||
_info = json.loads((open(params['auth_file'])).read())
|
||||
username=_info['username']
|
||||
password=_info['password']
|
||||
self.virtual_host = _info['virtual_host'] if 'virtual_host' in _info else self.virtual_host
|
||||
self.exchange = _info['exchange'] if 'exchange' in _info else self.exchange
|
||||
self.queue = _info['queue'] if 'queue' in _info else self.queue
|
||||
|
||||
self.credentials= pika.PlainCredentials('guest','guest')
|
||||
if 'username' in params :
|
||||
self.credentials = pika.PlainCredentials(
|
||||
params['username'],
|
||||
('' if 'password' not in params else params['password'])
|
||||
)
|
||||
|
||||
def init(self,label=None):
|
||||
properties = pika.ConnectionParameters(host=self.host,port=self.port,virtual_host=self.virtual_host,
|
||||
client_properties={'connection_name':self.name},
|
||||
credentials=self.credentials)
|
||||
self.connection = pika.BlockingConnection(properties)
|
||||
self.channel = self.connection.channel()
|
||||
self.info = self.channel.exchange_declare(exchange=self.exchange,exchange_type='direct',durable=True)
|
||||
if label is None:
|
||||
self.qhandler = self.channel.queue_declare(queue=self.queue,durable=True)
|
||||
else:
|
||||
self.qhandler = self.channel.queue_declare(queue=label,durable=True)
|
||||
|
||||
self.channel.queue_bind(exchange=self.exchange,queue=self.qhandler.method.queue)
|
||||
|
||||
def isready(self):
|
||||
#self.init()
|
||||
resp = self.connection is not None and self.connection.is_open
|
||||
# self.close()
|
||||
return resp
|
||||
def finalize(self):
|
||||
pass
|
||||
def close(self):
|
||||
if self.connection.is_closed == False :
|
||||
self.channel.close()
|
||||
self.connection.close()
|
||||
|
||||
class QueueWriter(MessageQueue,Writer):
|
||||
"""
|
||||
This class is designed to publish content to an AMQP (Rabbitmq)
|
||||
The class will rely on pika to implement this functionality
|
||||
|
||||
We will publish information to a given queue for a given exchange
|
||||
"""
|
||||
def __init__(self,**params):
|
||||
#self.host= params['host']
|
||||
#self.exchange = params['uid']
|
||||
#self.queue = params['queue']
|
||||
MessageQueue.__init__(self,**params);
|
||||
self.init()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def write(self,data,_type='text/plain'):
|
||||
"""
|
||||
This function writes a stream of data to the a given queue
|
||||
@param object object to be written (will be converted to JSON)
|
||||
@TODO: make this less chatty
|
||||
"""
|
||||
|
||||
stream = json.dumps(data) if isinstance(data,dict) else data
|
||||
self.channel.basic_publish(
|
||||
exchange=self.exchange,
|
||||
routing_key=self.queue,
|
||||
body=stream,
|
||||
properties=pika.BasicProperties(content_type=_type,delivery_mode=2)
|
||||
);
|
||||
# self.close()
|
||||
|
||||
def flush(self):
|
||||
self.init()
|
||||
_mode = 1 #-- Non persistent
|
||||
self.channel.queue_delete( queue=self.queue);
|
||||
self.close()
|
||||
|
||||
class QueueReader(MessageQueue,Reader):
|
||||
"""
|
||||
This class will read from a queue provided an exchange, queue and host
|
||||
@TODO: Account for security and virtualhosts
|
||||
"""
|
||||
|
||||
def __init__(self,**params):
|
||||
"""
|
||||
@param host host
|
||||
@param uid exchange identifier
|
||||
@param qid queue identifier
|
||||
"""
|
||||
|
||||
#self.host= params['host']
|
||||
#self.exchange = params['uid']
|
||||
#self.queue = params['qid']
|
||||
MessageQueue.__init__(self,**params);
|
||||
# self.init()
|
||||
self.durable = False if 'durable' not in params else params['durable']
|
||||
# if 'durable' in params :
|
||||
# self.durable = True
|
||||
# else:
|
||||
# self.durable = False
|
||||
self.size = -1
|
||||
self.data = {}
|
||||
# def init(self,qid):
|
||||
|
||||
# properties = pika.ConnectionParameters(host=self.host)
|
||||
# self.connection = pika.BlockingConnection(properties)
|
||||
# self.channel = self.connection.channel()
|
||||
# self.channel.exchange_declare(exchange=self.exchange,type='direct',durable=True)
|
||||
|
||||
# self.info = self.channel.queue_declare(queue=qid,durable=True)
|
||||
|
||||
|
||||
def callback(self,channel,method,header,stream):
|
||||
"""
|
||||
This is the callback function designed to process the data stream from the queue
|
||||
|
||||
"""
|
||||
|
||||
r = []
|
||||
# if re.match("^\{|\[",stream) is not None:
|
||||
if stream.startswith(b'{') or stream.startswith(b'['):
|
||||
r = json.loads(stream)
|
||||
else:
|
||||
|
||||
r = stream
|
||||
|
||||
qid = self.qhandler.method.queue
|
||||
if qid not in self.data :
|
||||
self.data[qid] = []
|
||||
|
||||
self.data[qid].append(r)
|
||||
#
|
||||
# We stop reading when the all the messages of the queue are staked
|
||||
#
|
||||
if self.size == len(self.data[qid]) or len(self.data[qid]) == self.info.method.message_count:
|
||||
self.close()
|
||||
|
||||
def read(self,**args):
|
||||
"""
|
||||
This function will read, the first message from a queue
|
||||
@TODO:
|
||||
Implement channel.basic_get in order to retrieve a single message at a time
|
||||
Have the number of messages retrieved be specified by size (parameter)
|
||||
"""
|
||||
r = {}
|
||||
self.size = -1 if 'size' in args else int(args['size'])
|
||||
#
|
||||
# We enabled the reader to be able to read from several queues (sequentially for now)
|
||||
# The qid parameter will be an array of queues the reader will be reading from
|
||||
#
|
||||
if isinstance(self.queue,str) :
|
||||
self.queue = [self.queue]
|
||||
|
||||
for qid in self.queue:
|
||||
self.init(qid)
|
||||
# r[qid] = []
|
||||
|
||||
if self.qhandler.method.message_count > 0:
|
||||
|
||||
self.channel.basic_consume(queue=qid,on_message_callback=self.callback,auto_ack=False);
|
||||
self.channel.start_consuming()
|
||||
else:
|
||||
|
||||
pass
|
||||
#self.close()
|
||||
# r[qid].append( self.data)
|
||||
|
||||
return self.data
|
||||
class QueueListener(MessageQueue):
|
||||
lock = RLock()
|
||||
"""
|
||||
This class is designed to have an active listener (worker) against a specified Exchange/Queue
|
||||
It is initialized as would any other object and will require a callback function to address the objects returned.
|
||||
"""
|
||||
def __init__(self,**args):
|
||||
MessageQueue.__init__(self,**args)
|
||||
self.listen = self.read
|
||||
self.apply = args['apply'] if 'apply' in args else print
|
||||
self.lock = False if 'lock' not in args else args['lock']
|
||||
|
||||
def finalize(self,channel,ExceptionReason):
|
||||
pass
|
||||
|
||||
def callback(self,channel,method,header,stream) :
|
||||
_info= {}
|
||||
# if re.match("^\{|\[",stream) is not None:
|
||||
|
||||
|
||||
if stream.startswith(b"[") or stream.startswith(b"{"):
|
||||
_info = json.loads(stream)
|
||||
else:
|
||||
|
||||
_info = stream
|
||||
#
|
||||
# At this point we should invoke the apply function with a lock if need be
|
||||
# @TODO: Establish a vocabulary
|
||||
|
||||
if stream == b'QUIT' :
|
||||
# channel.exit()
|
||||
self.close()
|
||||
if self.lock == True :
|
||||
QueueListener.lock.acquire()
|
||||
try:
|
||||
#
|
||||
# In case the user has not specified a function to apply the data against, it will simply be printed
|
||||
#
|
||||
self.apply(_info)
|
||||
except Exception as e:
|
||||
pass
|
||||
if self.lock == True :
|
||||
QueueListener.lock.release()
|
||||
def read(self):
|
||||
|
||||
self.init(self.queue)
|
||||
|
||||
self.channel.basic_consume(self.queue,self.callback,auto_ack=True);
|
||||
self.channel.start_consuming()
|
||||
|
||||
|
||||
|
||||
class Factory :
|
||||
@staticmethod
|
||||
def instance(**_args):
|
||||
"""
|
||||
:param count number of workers
|
||||
:param apply function workers
|
||||
"""
|
||||
_apply = _args['apply']
|
||||
_count = _args['count']
|
||||
for i in np.arange(_count) :
|
||||
_name = _args['name'] if 'name' in _args else 'worker_'+str(i)
|
||||
transport.factory.instance(provider="rabbit",context="listener",apply=_apply,auth_file=_args['auth_file'])
|
@ -1,130 +0,0 @@
|
||||
"""
|
||||
Data Transport - 1.0
|
||||
Steve L. Nyemba, The Phi Technology LLC
|
||||
|
||||
This file is a wrapper around s3 bucket provided by AWS for reading and writing content
|
||||
"""
|
||||
from datetime import datetime
|
||||
import boto
|
||||
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
|
||||
import numpy as np
|
||||
import botocore
|
||||
from smart_open import smart_open
|
||||
import sys
|
||||
if sys.version_info[0] > 2 :
|
||||
from transport.common import Reader, Writer
|
||||
else:
|
||||
from common import Reader, Writer
|
||||
import json
|
||||
from io import StringIO
|
||||
import json
|
||||
|
||||
class s3 :
|
||||
"""
|
||||
@TODO: Implement a search function for a file given a bucket??
|
||||
"""
|
||||
def __init__(self,**args) :
|
||||
"""
|
||||
This function will extract a file or set of files from s3 bucket provided
|
||||
@param access_key
|
||||
@param secret_key
|
||||
@param path location of the file
|
||||
@param filter filename or filtering elements
|
||||
"""
|
||||
try:
|
||||
self.s3 = S3Connection(args['access_key'],args['secret_key'],calling_format=OrdinaryCallingFormat())
|
||||
self.bucket = self.s3.get_bucket(args['bucket'].strip(),validate=False) if 'bucket' in args else None
|
||||
# self.path = args['path']
|
||||
self.filter = args['filter'] if 'filter' in args else None
|
||||
self.filename = args['file'] if 'file' in args else None
|
||||
self.bucket_name = args['bucket'] if 'bucket' in args else None
|
||||
|
||||
except Exception as e :
|
||||
self.s3 = None
|
||||
self.bucket = None
|
||||
print (e)
|
||||
def meta(self,**args):
|
||||
"""
|
||||
:name name of the bucket
|
||||
"""
|
||||
info = self.list(**args)
|
||||
[item.open() for item in info]
|
||||
return [{"name":item.name,"size":item.size} for item in info]
|
||||
def list(self,**args):
|
||||
"""
|
||||
This function will list the content of a bucket, the bucket must be provided by the name
|
||||
:name name of the bucket
|
||||
"""
|
||||
return list(self.s3.get_bucket(args['name']).list())
|
||||
|
||||
|
||||
def buckets(self):
|
||||
#
|
||||
# This function will return all buckets, not sure why but it should be used cautiously
|
||||
# based on why the s3 infrastructure is used
|
||||
#
|
||||
return [item.name for item in self.s3.get_all_buckets()]
|
||||
|
||||
# def buckets(self):
|
||||
pass
|
||||
# """
|
||||
# This function is a wrapper around the bucket list of buckets for s3
|
||||
# """
|
||||
# return self.s3.get_all_buckets()
|
||||
|
||||
|
||||
class s3Reader(s3,Reader) :
|
||||
"""
|
||||
Because s3 contains buckets and files, reading becomes a tricky proposition :
|
||||
- list files if file is None
|
||||
- stream content if file is Not None
|
||||
@TODO: support read from all buckets, think about it
|
||||
"""
|
||||
def __init__(self,**args) :
|
||||
s3.__init__(self,**args)
|
||||
def files(self):
|
||||
r = []
|
||||
try:
|
||||
return [item.name for item in self.bucket if item.size > 0]
|
||||
except Exception as e:
|
||||
pass
|
||||
return r
|
||||
def stream(self,limit=-1):
|
||||
"""
|
||||
At this point we should stream a file from a given bucket
|
||||
"""
|
||||
key = self.bucket.get_key(self.filename.strip())
|
||||
if key is None :
|
||||
yield None
|
||||
else:
|
||||
count = 0
|
||||
with smart_open(key) as remote_file:
|
||||
for line in remote_file:
|
||||
if count == limit and limit > 0 :
|
||||
break
|
||||
yield line
|
||||
count += 1
|
||||
def read(self,**args) :
|
||||
if self.filename is None :
|
||||
#
|
||||
# returning the list of files because no one file was specified.
|
||||
return self.files()
|
||||
else:
|
||||
limit = args['size'] if 'size' in args else -1
|
||||
return self.stream(limit)
|
||||
|
||||
class s3Writer(s3,Writer) :
|
||||
|
||||
def __init__(self,**args) :
|
||||
s3.__init__(self,**args)
|
||||
def mkdir(self,name):
|
||||
"""
|
||||
This function will create a folder in a bucket
|
||||
:name name of the folder
|
||||
"""
|
||||
self.s3.put_object(Bucket=self.bucket_name,key=(name+'/'))
|
||||
def write(self,content):
|
||||
file = StringIO(content.decode("utf8"))
|
||||
self.s3.upload_fileobj(file,self.bucket_name,self.filename)
|
||||
pass
|
||||
|
Loading…
Reference in new issue