refactored user handling object to comply with new API

legacy
Steve L. Nyemba 6 years ago
parent ed2466d749
commit 0368d78496

@ -0,0 +1,180 @@
"""
This file consists in managing all things user related
"""
from utils.transport import CouchdbReader, CouchdbWriter
from threading import Thread
import json
class User :
def __init__(self,**args) :
"""
@param product identifier of the product we are working with
@param stripe stripe handler
@param store transport store (couchdb, ...)
"""
self.stripe = args["stripe"]
self.store = args['store']
self.store['dbname'] = args['product']
self.product = args['product']
self.me = {}
self.init()
def init(self):
lproducts = self.stripe.Product.list()
lproducts = [item for item in lproducts.auto_paging_iter() if item.name == self.product ]
if lproducts :
self.me['info'] = {"active":lproducts[0].active,"id":lproducts[0].id,"description":lproducts[0].statement_descriptor,"images":lproducts[0].images}
def init_customer(self,uid):
"""
This function copies a customer to the internal database
"""
customer = self.stripe.Customer.list(email=uid)
if customer :
customer = [item for item in customer if item.email == uid]
customer = customer[0]
#
# Insure the product is the one we are looking for ...
args = dict(self.store)
reader = CouchdbReader(**args)
key = reader.view('users/uid_map',key=uid)
if not key :
self.store['uid'] = customer.id
self.update(_id=customer.id,emails=[uid])
self.post()
#-- housekeeping work
return customer
else:
return None
def update(self,**args):
for key in args :
value = args[key]
if key in self.me :
if not isinstance(self.me[key],dict) or not isinstance(value,dict) :
if not isinstance(self.me[key],list) :
self.me[key] = [self.me[key],value]
elif isinstance(self.me[key],list):
self.me[key] += value if isinstance(value,list) else [value]
else:
self.me[key] = dict(self.me[key],**value)
else:
self.me[key] = value
def post(self,**args):
args = dict(self.store)
args['create'] = True
writer = CouchdbWriter(**args)
writer.set(self.me)
# writer.close()
def get(self,uid,key):
args = dict(self.store)
args['uid'] = 'logs'
couchdb = CouchdbReader(**args)
document = couchdb.view('users/uid_map',key=uid)
return document
# def plans(self,free=False):
# lproducts = self.stripe.Product.list()
# plans = [item for item in lproducts.auto_paging_iter() if item.name == self.product ]
# if free == True :
# plans = [item for item in plans if item.amount == 0]
# return plans
def subscribe(self,uid,pid=None,stripeToken=None) :
"""
@param uid user's email
@param pid plan id
@param stripeToken stripe token to process payments
"""
# customer = self.stripe.Customer.list(email=uid).data
# if customer :
# customer = [item for item in customer if item.email == uid]
# customer = customer[0]
#
customer = self.init_customer(uid)
if pid is None :
#
# In this block we try to find the free plan if one isn't specified
# The informtion about the product is already known (constructor)
#
# lproducts = self.stripe.Product.list()
# lproducts = [item for item in lproducts.auto_paging_iter() if item.name == self.product ]
product_id = self.me['info']['id']
plans = self.stripe.Plan.list(product=product_id)
plans = [item for item in plans.auto_paging_iter() if item.amount == 0 ]
pid = None if not plans else plans[0].id
if not customer :
customer = self.stripe.Customer.create(email=uid)
self.update(_id=customer.id,emails=[uid])
# else:
# customer = customer
self.store['uid'] = customer.id
args = {"customer":customer.id,"items":[{"plan":pid}]}
if stripeToken:
args['source'] = stripeToken
if customer.subscriptions.data :
lsub = customer.subscriptions ;
found = False
for sub in lsub.data :
if sub.plan.id == pid :
found = True
break
# for sub in lsub :
# found = [ for item in lsub.d
# found = [plan for plan in customer.subscriptions.data if plan.id == pid and plan.active == True]
# print " found ",len(found) > 0
else:
found = False
if found is False :
sub = self.stripe.Subscription.create(**args)
info = {sub.plan.nickname:sub.plan}
self.update(subscriptions=info)
self.post()
else:
pass
#
# keep a copy of this on our servers ...
#
def refresh(self,uid):
parent = (self)
def _update(customer_id) :
customer = parent.stripe.Customer.retrieve(customer_id)
parent.store['uid'] = customer.id
lsub = customer.subscriptions
reader = CouchdbReader(**parent.store)
parent.me = reader.read()
product_id = parent.me['info']['id']
[parent.update(subscriptions={sub.plan.nickname:sub.plan}) for sub in lsub.auto_paging_iter() if sub.plan.product == product_id and sub.plan.active == True]
parent.post()
reader = CouchdbReader(**self.store)
key = reader.view('users/uid_map',key=uid)
if key :
if isinstance(key,list) :
key = key[0]['value']
elif 'value' in key :
key = key['value']
thread = Thread(target=_update,args=(key,))
thread.start()
# thread.start()
def pay(self,uid,pid,stripeToken):
pass

@ -555,6 +555,7 @@ class CouchdbWriter(Couchdb,Writer):
@param dbname database name (target) @param dbname database name (target)
""" """
def __init__(self,**args): def __init__(self,**args):
Couchdb.__init__(self,**args)
uri = args['uri'] uri = args['uri']
self.uid = args['uid'] self.uid = args['uid']
if 'filename' in args: if 'filename' in args:
@ -567,7 +568,13 @@ class CouchdbWriter(Couchdb,Writer):
# #
# If the document doesn't exist then we should create it # If the document doesn't exist then we should create it
# #
def set(self,document):
_document = self.dbase.get(self.uid)
if _document :
document['_id'] = _document['_id']
document['_rev']= _document['_rev']
self.dbase.save_doc(document)
""" """
write a given attribute to a document database write a given attribute to a document database
@param label scope of the row repair|broken|fixed|stats @param label scope of the row repair|broken|fixed|stats

@ -0,0 +1,42 @@
from api.User import User
import unittest
import json
import stripe
f = open('../config.json')
CONFIG = json.loads(f.read())
f.close()
stripe.api_key = CONFIG['stripe']['secret']
class TestUser(unittest.TestCase):
def test_CreateUser(self):
user = User(stripe=stripe,store=CONFIG['couchdb'],product='music')
self.assertTrue('_id' not in user.me)
self.assertTrue('info' in user.me)
def test_EditUser(self):
user = User(stripe=stripe,store=CONFIG['couchdb'],product='music')
user.update(emails=['nyemba@gmail.com'])
user.update(emails='steve@the-phi.com')
# user.post()
self.assertTrue(user.me)
self.assertTrue('emails' in user.me)
for email in ['nyemba@gmail.com','steve@the-phi.com'] :
self.assertTrue(email in user.me['emails'])
def test_SubscribeNewUser(self):
user = User(stripe=stripe,store=CONFIG['couchdb'],product='music')
user.subscribe('nyemba@gmail.com')
# self.assertTrue('subscriptions' in user.me)
pass
def test_UpdatePlan(self):
user = User(stripe=stripe,store=CONFIG['couchdb'],product='music')
user.refresh("nyemba@gmail.com")
def test_UserSubscriptions(self):
pass
def test_Products(self):
pass
if __name__ == '__main__':
unittest.main()
Loading…
Cancel
Save