Basic user subscription & plan functions. Including retrieval of plans

legacy
Steve L. Nyemba 8 years ago
parent 110f297279
commit 2e5c5c77c8

@ -4,6 +4,7 @@
import stripe
from couchdbkit import Server, Document
import json
from sets import Set
class User:
def __init__(self,db,stripe=None,stripeToken=None) :
@ -11,50 +12,88 @@ class User:
self.stripe = stripe;
self.stripeToken= stripeToken
self.user = None
"""
This function will cast an object to JSON,
It is designed to address an inherent limitation of stripe object hierarchy
"""
def cast(self,object) :
return json.loads(json.dumps(object, sort_keys=True, indent=2))
"""
This function creates a stripe customer, and saves a copy with us for internal use
We use couchdb as a cache of sorts
"""
def init (self,uid,plans=None):
def init (self,uid,plans):
customer = {}
if self.db.doc_exist(uid) :
self.user = self.db.get(uid)
# self.hasPlan(uid,plan)
if self.db.doc_exist(uid) == False and plans and stripe :
has_plan = True
if self.user is None and plans and stripe :
#
# First time customer, register them and sign them up for the first plan
#
customer['source'] = str(self.stripeToken)
customer = self.stripe.Customer.create(
source=self.stripeToken,
email=uid
) ;
has_plan = False
id = customer['id']
subscriptions = self.subscribe(id,plans)
self.user = {"_id":uid,"id":id}
else:
#
# The user exists but let's see if the user is subscribed to this plan
# If she is and the plan is still live then there is nothing to do
#
self.user = self.db.get(uid) ;
id = self.user['id']
if uid and self.db.doc_exist(uid) == False:
info = {'id':id,'created':customer.created}
info['_id'] = uid.strip();
lsub = self.subscriptions()
lplans = [str(item['plan']['id']) for item in lsub.data if item.ended_at in [None,""]]
x_plans = [item['id'] for item in plans]
info['subscriptions'] = subscriptions
r = self.db.save_doc(info) ;
if r['ok'] :
self.user = info;
if lplans and not set(x_plans) - set(lplans) :
has_plans = False
x = list(set(x_plans) - set(lplans))
plans = [ item for item in plans if item.id in x]
else:
#
# @TODO: update 4if the a plan was provided
self.user = self.db.get(uid) ;
r = stripe.Customer.retrieve(self.user['id'])
self.user['subscriptions'] = r.subscriptions.data
self.db.save_doc(self.user)
has_plan = False
# lsub = lsub.data
#
# At this point We should either subscribe the user or not
#
if has_plan == False :
r = self.subscribe(id,plans)
lsub.data.append(r[0])
lsub = self.cast(lsub.data)
#
# We need to save the document & the information
#
self.user['subscriptions'] = lsub
self.db.save_doc(self.user)
def subscriptions(self):
return self.user['subscriptions']
#
# call stripe to retrieve subscriptions for this user ...
#
if self.user :
r = stripe.Customer.retrieve(self.user['id'])
return r.subscriptions
else:
return []
"""
This function subscribes a customer to n-plans

@ -1,3 +1,8 @@
"""
This file handles customer & plans associated with a given product/app
The subscription works as follows:
-
"""
from __future__ import division
from flask import Flask, request, session, render_template,Response
import Domain
@ -7,8 +12,10 @@ import json
from StringIO import StringIO
import re
import os
PORT = 8100 ;
path = os.environ['CONFIG']
from utils.params import PARAMS
from utils.transport import Couchdb, CouchdbReader
PORT = 8100 if 'port' not in PARAMS else int(PARAMS['port']) ;
path = PARAMS['path'] #os.environ['CONFIG']
f = open(path)
CONFIG = json.loads(f.read())
stripe_keys = {
@ -27,54 +34,52 @@ app = Flask(__name__)
"""
COUCHDB = Server(uri=CONFIG['couchdb']['uri']) ;
"""
This function subscribes a user to a given service for an application
This function guarantees not to duplicate subscriptions
@resource name name of the application {cloud-music}
@header key service/plan
"""
@app.route('/subscribe/<app_name>',methods=['POST'])
def subscribe(app_name):
#
# The name is the full name of the service
#
@app.route('/init',methods=['POST'])
def init():
DB = COUCHDB.get_db(CONFIG['couchdb']['db']) ;
out_resp = '0'
if 'User-Info' in request.headers:
stream = request.headers.get('User-Info') ;
user = json.loads(stream) ;
session['user-info'] = user ;
#
# We should create the user in couchdb
uid = user['uid'] ;
session['uid'] = uid
key = request.headers['key']
uid = request.headers['uid']
plans = stripe.Plan.list().data
plan = [item for item in plans if item.id == key]
resp = "0"
if plan :
couch_handler = Couchdb(uri=CONFIG['couchdb']['uri'],dbname=app_name,uid=uid)
DB = couch_handler.dbase
handler = Domain.User(DB,stripe) ;
"""
registering the user with us and making sure we can keep track of her basic operations & access to features
At this point this is just a user i.e a consumer (not store owner), we will tally track on the type of user later on
"""
if handler.exists(uid) == False:
#
# We need to subscribe the user to the freemium services if any at all
# Subscribing to a freemium plan hopefully this enables basic features of the application
#
free = []
paid = []
plans = [plan for plan in stripe.Plan.list().data if plan.amount == 0 ]
handler.init(uid,plans) ;
out_resp = session['user-info']
elif 'subscriptions' not in session:
#
# We have the user we should return the user's information
#
handler.init(uid)
out_resp = handler.subscriptions()
#
# updating user-info with more information ...
#
session['user-info'] = handler.user;
out_resp = CONFIG['stripe']['pub'].strip()
# return '1'
elif 'user-info' in session:
out_resp = CONFIG['stripe']['pub'].strip()
return out_resp
handler.init(uid,plan)
resp = plan[0].id
return resp
"""
This function returns the meta data about a given plan or set of plans for a given application
@resource app_name application identifier
@header pid plan identifier (optional)
@header uid user identifier
"""
@app.route('/get/info/<app_name>',methods=['GET'])
def get_plan_info(app_name) :
uid = request.headers['uid']
pid = request.headers['pid'] if 'pid' in request.headers else None
couchdb = CouchdbReader(uri=CONFIG['couchdb']['uri'],dbname=app_name,uid=uid,create=False)
info = couchdb.read()
lsub = info['subscriptions']
plans = [ sub['plan'] for sub in lsub if sub['ended_at'] is None ]
if pid is not None :
plans = [item['metadata'] for item in plans if item['id'] == pid]
return json.dumps(plans)
@app.route('/subscribe/<name>',methods=['DELETE'])
def cancel_subscribe(name) :
pass
"""
This function defines if a given user is a customer or not
We should be able to tell by how we create customers
@ -158,7 +163,15 @@ def buy():
@app.route('/plans',methods=['GET'])
def plans():
plans = stripe.Plan.list().data
plans = [{"id":item.id,"price":item.amount/100,"feature":item.metadata['info'],"trial":item.trial_period_days} for item in plans if re.match('cloudplayer',item.id)]
if 'filter' in request.headers:
filter = request.headers['filter']
plans = [ item for item in plans if re.match(filter,item.name)]
else:
#
# Let's get a user's subscription information
#
uid = request.headers['uid']
if 'uid' in request.headers and request.headers['uid'] != '':
uid = request.headers['uid']
@ -176,22 +189,33 @@ def plans():
This function subscribes a user to a given plan(s)
If the plan is new, then we do NOT need a credit card info
@header app application/requesting service
@body info {user:_id,plan:[]}
"""
@app.route('/subscribe',methods=['POST'])
def subscribe():
@app.route('/_subscribe',methods=['POST'])
def _subscribe():
plans = [{"id":id} for id in request.get_json(silent=True)]
user = session['user-info']
if 'user-info' not in session:
info = request.get_json(silent=True)
user = info['user']
plans = info['plans']
else:
plans = [{"id":id} for id in request.get_json(silent=True)]
user = session['user-info']
app = request.headers['app']
#
# @TODO:
# This should be handled by transport layer ...
#
DB = COUCHDB.get_db(CONFIG['couchdb']['db']) ;
handler = Domain.User(DB,stripe)
r = handler.subscribe(user['id'],plans)
return json.dumps(r)
app.debug = True ;
app.secret_key = '360-8y-[0v@t10n]+kr81v17y'
app.run(port=PORT,threaded=True)
if __name__ == '__main__' :
app.debug = True ;
app.secret_key = '360-8y-[0v@t10n]+kr81v17y'
app.run(port=PORT,threaded=True)

@ -0,0 +1,19 @@
import sys
PARAMS = {'context':''}
if len(sys.argv) > 1:
N = len(sys.argv)
for i in range(1,N):
value = None
if sys.argv[i].startswith('--'):
key = sys.argv[i].replace('-','')
if i + 1 < N:
value = sys.argv[i + 1] = sys.argv[i+1].strip()
if key and value:
PARAMS[key] = value
if key == 'context':
PARAMS[key] = ('/'+value).replace('//','/')
i += 2

@ -0,0 +1,675 @@
"""
This file implements data transport stuctures in order to allow data to be moved to and from anywhere
We can thus read data from disk and write to the cloud,queue, or couchdb or SQL
"""
from flask import request, session
import os
import pika
import json
import numpy as np
from couchdbkit import Server
import re
from csv import reader
from datetime import datetime
"""
@TODO: Write a process by which the class automatically handles reading and creating a preliminary sample and discovers the meta data
"""
class Reader:
def __init__(self):
self.nrows = 0
self.xchar = None
def row_count(self):
content = self.read()
return np.sum([1 for row in content])
"""
This function determines the most common delimiter from a subset of possible delimiters. It uses a statistical approach to guage the distribution of columns for a given delimiter
"""
def delimiter(self,sample):
m = {',':[],'\t':[],'|':[],'\x3A':[]}
delim = m.keys()
for row in sample:
for xchar in delim:
if row.split(xchar) > 1:
m[xchar].append(len(row.split(xchar)))
else:
m[xchar].append(0)
#
# The delimiter with the smallest variance, provided the mean is greater than 1
# This would be troublesome if there many broken records sampled
#
m = {id: np.var(m[id]) for id in m.keys() if m[id] != [] and int(np.mean(m[id]))>1}
index = m.values().index( min(m.values()))
xchar = m.keys()[index]
return xchar
"""
This function determines the number of columns of a given sample
@pre self.xchar is not None
"""
def col_count(self,sample):
m = {}
i = 0
for row in sample:
row = self.format(row)
id = str(len(row))
#id = str(len(row.split(self.xchar)))
if id not in m:
m[id] = 0
m[id] = m[id] + 1
index = m.values().index( max(m.values()) )
ncols = int(m.keys()[index])
return ncols;
"""
This function will clean records of a given row by removing non-ascii characters
@pre self.xchar is not None
"""
def format (self,row):
if isinstance(row,list) == False:
#
# We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary)
cols = self.split(row)
#cols = row.split(self.xchar)
else:
cols = row ;
return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols]
#if isinstance(row,list) == False:
# return (self.xchar.join(r)).format('utf-8')
#else:
# return r
"""
This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes.
@pre : self.xchar is not None
"""
def split (self,row):
pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"])
return re.findall(pattern,row.replace('\n',''))
class Writer:
def format(self,row,xchar):
if xchar is not None and isinstance(row,list):
return xchar.join(row)+'\n'
elif xchar is None and isinstance(row,dict):
row = json.dumps(row)
return row
"""
It is important to be able to archive data so as to insure that growth is controlled
Nothing in nature grows indefinitely neither should data being handled.
"""
def archive(self):
pass
def flush(self):
pass
"""
This class is designed to read data from an Http request file handler provided to us by flask
The file will be heald in memory and processed accordingly
NOTE: This is inefficient and can crash a micro-instance (becareful)
"""
class HttpRequestReader(Reader):
def __init__(self,**params):
self.file_length = 0
try:
#self.file = params['file']
#self.file.seek(0, os.SEEK_END)
#self.file_length = self.file.tell()
#print 'size of file ',self.file_length
self.content = params['file'].readlines()
self.file_length = len(self.content)
except Exception, e:
print "Error ... ",e
pass
def isready(self):
return self.file_length > 0
def read(self,size =-1):
i = 1
for row in self.content:
i += 1
if size == i:
break
yield row
"""
This class is designed to write data to a session/cookie
"""
class HttpSessionWriter(Writer):
"""
@param key required session key
"""
def __init__(self,**params):
self.session = params['queue']
self.session['sql'] = []
self.session['csv'] = []
self.tablename = re.sub('..+$','',params['filename'])
self.session['uid'] = params['uid']
#self.xchar = params['xchar']
def format_sql(self,row):
values = "','".join([col.replace('"','').replace("'",'') for col in row])
return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename)
def isready(self):
return True
def write(self,**params):
label = params['label']
row = params ['row']
if label == 'usable':
self.session['csv'].append(self.format(row,','))
self.session['sql'].append(self.format_sql(row))
"""
This class is designed to read data from disk (location on hard drive)
@pre : isready() == True
"""
class DiskReader(Reader) :
"""
@param path absolute path of the file to be read
"""
def __init__(self,**params):
Reader.__init__(self)
self.path = params['path'] ;
def isready(self):
return os.path.exists(self.path)
"""
This function reads the rows from a designated location on disk
@param size number of rows to be read, -1 suggests all rows
"""
def read(self,size=-1):
f = open(self.path,'rU')
i = 1
for row in f:
i += 1
if size == i:
break
yield row
f.close()
"""
This function writes output to disk in a designated location
"""
class DiskWriter(Writer):
def __init__(self,**params):
if 'path' in params:
self.path = params['path']
else:
self.path = None
if 'name' in params:
self.name = params['name'];
else:
self.name = None
if os.path.exists(self.path) == False:
os.mkdir(self.path)
"""
This function determines if the class is ready for execution or not
i.e it determines if the preconditions of met prior execution
"""
def isready(self):
p = self.path is not None and os.path.exists(self.path)
q = self.name is not None
return p and q
"""
This function writes a record to a designated file
@param label <passed|broken|fixed|stats>
@param row row to be written
"""
def write(self,**params):
label = params['label']
row = params['row']
xchar = None
if 'xchar' is not None:
xchar = params['xchar']
path = ''.join([self.path,os.sep,label])
if os.path.exists(path) == False:
os.mkdir(path) ;
path = ''.join([path,os.sep,self.name])
f = open(path,'a')
row = self.format(row,xchar);
f.write(row)
f.close()
"""
This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq)
"""
class MessageQueue:
def __init__(self,**params):
self.host= params['host']
self.uid = params['uid']
self.qid = params['qid']
def isready(self):
#self.init()
resp = self.connection is not None and self.connection.is_open
self.close()
return resp
def close(self):
self.channel.close()
self.connection.close()
"""
This class is designed to publish content to an AMQP (Rabbitmq)
The class will rely on pika to implement this functionality
We will publish information to a given queue for a given exchange
"""
class QueueWriter(MessageQueue,Writer):
def __init__(self,**params):
#self.host= params['host']
#self.uid = params['uid']
#self.qid = params['queue']
MessageQueue.__init__(self,**params);
def init(self,label=None):
properties = pika.ConnectionParameters(host=self.host)
self.connection = pika.BlockingConnection(properties)
self.channel = self.connection.channel()
self.info = self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True)
if label is None:
self.qhandler = self.channel.queue_declare(queue=self.qid,durable=True)
else:
self.qhandler = self.channel.queue_declare(queue=label,durable=True)
self.channel.queue_bind(exchange=self.uid,queue=self.qhandler.method.queue)
"""
This function writes a stream of data to the a given queue
@param object object to be written (will be converted to JSON)
@TODO: make this less chatty
"""
def write(self,**params):
xchar = None
if 'xchar' in params:
xchar = params['xchar']
object = self.format(params['row'],xchar)
label = params['label']
self.init(label)
_mode = 2
if isinstance(object,str):
stream = object
_type = 'text/plain'
else:
stream = json.dumps(object)
if 'type' in params :
_type = params['type']
else:
_type = 'application/json'
self.channel.basic_publish(
exchange=self.uid,
routing_key=label,
body=stream,
properties=pika.BasicProperties(content_type=_type,delivery_mode=_mode)
);
self.close()
def flush(self,label):
self.init(label)
_mode = 1 #-- Non persistent
self.channel.queue_delete( queue=label);
self.close()
"""
This class will read from a queue provided an exchange, queue and host
@TODO: Account for security and virtualhosts
"""
class QueueReader(MessageQueue,Reader):
"""
@param host host
@param uid exchange identifier
@param qid queue identifier
"""
def __init__(self,**params):
#self.host= params['host']
#self.uid = params['uid']
#self.qid = params['qid']
MessageQueue.__init__(self,**params);
if 'durable' in params :
self.durable = True
else:
self.durable = False
self.size = -1
self.data = {}
def init(self,qid):
properties = pika.ConnectionParameters(host=self.host)
self.connection = pika.BlockingConnection(properties)
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True)
self.info = self.channel.queue_declare(queue=qid,durable=True)
"""
This is the callback function designed to process the data stream from the queue
"""
def callback(self,channel,method,header,stream):
r = []
if re.match("^\{|\[",stream) is not None:
r = json.loads(stream)
else:
r = stream
qid = self.info.method.queue
if qid not in self.data :
self.data[qid] = []
self.data[qid].append(r)
#
# We stop reading when the all the messages of the queue are staked
#
if self.size == len(self.data[qid]) or len(self.data[qid]) == self.info.method.message_count:
self.close()
"""
This function will read, the first message from a queue
@TODO:
Implement channel.basic_get in order to retrieve a single message at a time
Have the number of messages retrieved be specified by size (parameter)
"""
def read(self,size=-1):
r = {}
self.size = size
#
# We enabled the reader to be able to read from several queues (sequentially for now)
# The qid parameter will be an array of queues the reader will be reading from
#
for qid in self.qid:
self.init(qid)
# r[qid] = []
if self.info.method.message_count > 0:
self.channel.basic_consume(self.callback,queue=qid,no_ack=False);
self.channel.start_consuming()
else:
pass
#self.close()
# r[qid].append( self.data)
return self.data
class QueueListener(QueueReader):
def init(self,qid):
properties = pika.ConnectionParameters(host=self.host)
self.connection = pika.BlockingConnection(properties)
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True )
self.info = self.channel.queue_declare(exclusive=True,queue=qid)
self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid)
#self.callback = callback
def read(self):
self.init(self.qid)
self.channel.basic_consume(self.callback,queue=self.qid,no_ack=True);
self.channel.start_consuming()
"""
This class is designed to write output as sql insert statements
The class will inherit from DiskWriter with minor adjustments
@TODO: Include script to create the table if need be using the upper bound of a learner
"""
class SQLDiskWriter(DiskWriter):
def __init__(self,**args):
DiskWriter.__init__(self,**args)
self.tablename = re.sub('\..+$','',self.name).replace(' ','_')
"""
@param label
@param row
@param xchar
"""
def write(self,**args):
label = args['label']
row = args['row']
if label == 'usable':
values = "','".join([col.replace('"','').replace("'",'') for col in row])
row = "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename)
args['row'] = row
DiskWriter.write(self,**args)
class Couchdb:
"""
@param uri host & port reference
@param uid user id involved
@param dbname database name (target)
"""
def __init__(self,**args):
uri = args['uri']
self.uid = args['uid']
dbname = args['dbname']
self.server = Server(uri=uri)
self.dbase = self.server.get_db(dbname)
create = args['create'] if 'create' in args else False
if self.dbase.doc_exist(self.uid) == False and create == True:
self.dbase.save_doc({"_id":self.uid})
"""
Insuring the preconditions are met for processing
"""
def isready(self):
p = self.server.info() != {}
if p == False or self.dbase.dbname not in self.server.all_dbs():
return False
#
# At this point we are sure that the server is connected
# We are also sure that the database actually exists
#
q = self.dbase.doc_exist(self.uid)
return p and q
"""
This function will read an attachment from couchdb and return it to calling code. The attachment must have been placed before hand (otherwise oops)
@T: Account for security & access control
"""
class CouchdbReader(Couchdb,Reader):
"""
@param filename filename (attachment)
"""
def __init__(self,**args):
#
# setting the basic parameters for
Couchdb.__init__(self,**args)
if 'filename' in args :
self.filename = args['filename']
else:
self.filename = None
def isready(self):
#
# Is the basic information about the database valid
#
p = Couchdb.isready(self)
if p == False:
return False
#
# The database name is set and correct at this point
# We insure the document of the given user has the requested attachment.
#
doc = self.dbase.get(self.uid)
q = doc is not None
if '_attachments' in doc:
r = self.filename in doc['_attachments'].keys()
else:
r = True
return p and q and r
def stream(self):
content = self.dbase.fetch_attachment(self.uid,self.filename).split('\n') ;
i = 1
for row in content:
yield row
if size > 0 and i == size:
break
i = i + 1
def read(self,size=-1):
if self.filename is not None:
self.stream()
else:
return self.basic_read()
def basic_read(self):
document = self.dbase.get(self.uid)
del document['_id'], document['_rev']
return document
"""
This class will write on a couchdb document provided a scope
The scope is the attribute that will be on the couchdb document
"""
class CouchdbWriter(Couchdb,Writer):
"""
@param uri host & port reference
@param uid user id involved
@param filename filename (attachment)
@param dbname database name (target)
"""
def __init__(self,**args):
uri = args['uri']
self.uid = args['uid']
if 'filename' in args:
self.filename = args['filename']
else:
self.filename = None
dbname = args['dbname']
self.server = Server(uri=uri)
self.dbase = self.server.get_db(dbname)
#
# If the document doesn't exist then we should create it
#
"""
write a given attribute to a document database
@param label scope of the row repair|broken|fixed|stats
@param row row to be written
"""
def write(self,**params):
document = self.dbase.get(self.uid)
label = params['label']
row = params['row']
if label not in document :
document[label] = []
document[label].append(row)
self.dbase.save_doc(document)
def flush(self,**params) :
size = params['size']
has_changed = False
document = self.dbase.get(self.uid)
for key in document:
if key not in ['_id','_rev','_attachments'] :
content = document[key]
else:
continue
if isinstance(content,list):
index = len(content) - size
content = content[index:]
document[key] = content
else:
document[key] = {}
has_changed = True
if has_changed:
self.dbase.save_doc(document)
def archive(self,params=None):
document = self.dbase.get(self.uid)
content = {}
_doc = {}
for id in document:
if id in ['_id','_rev','_attachments'] :
_doc[id] = document[id]
else:
content[id] = document[id]
content = json.dumps(content)
document= _doc
now = str(datetime.today())
name = '-'.join([document['_id'] , now,'.json'])
self.dbase.save_doc(document)
self.dbase.put_attachment(document,content,name,'application/json')
"""
This class acts as a factory to be able to generate an instance of a Reader/Writer
Against a Queue,Disk,Cloud,Couchdb
The class doesn't enforce parameter validation, thus any error with the parameters sent will result in a null Object
"""
class DataSourceFactory:
def instance(self,**args):
source = args['type']
params = args['args']
anObject = None
if source in ['HttpRequestReader','HttpSessionWriter']:
#
# @TODO: Make sure objects are serializable, be smart about them !!
#
aClassName = ''.join([source,'(**params)'])
else:
stream = json.dumps(params)
aClassName = ''.join([source,'(**',stream,')'])
try:
anObject = eval( aClassName)
#setattr(anObject,'name',source)
except Exception,e:
print ['Error ',e]
return anObject
"""
This class implements a data-source handler that is intended to be used within the context of data processing, it allows to read/write anywhere transparently.
The class is a facade to a heterogeneous class hierarchy and thus simplifies how the calling code interacts with the class hierarchy
"""
class DataSource:
def __init__(self,sourceType='Disk',outputType='Disk',params={}):
self.Input = DataSourceFactory.instance(type=sourceType,args=params)
self.Output= DataSourceFactory.instance(type=outputType,args=params)
def read(self,size=-1):
return self.Input.read(size)
def write(self,**args):
self.Output.write(**args)
#p = {}
#p['host'] = 'dev.the-phi.com'
#p['uid'] = 'nyemba@gmail.com'
#p['qid'] = 'repair'
#factory = DataSourceFactory()
#o = factory.instance(type='QueueReader',args=p)
#print o is None
#q = QueueWriter(host='dev.the-phi.com',uid='nyemba@gmail.com')
#q.write(object='steve')
#q.write(object='nyemba')
#q.write(object='elon')
Loading…
Cancel
Save