store engine

legacy
Steve L. Nyemba 5 years ago
parent d5705e1eb6
commit cc08f85297

@ -1,127 +1,457 @@
""" """
This file is a wrapper for stripe and store handling operations : This file is a wrapper for stripe and store handling operations :
- subscribe
- user @TODO:
- upgrade - Add logging using transport
- downgrade
- plans (for a given product)
""" """
import stripe import stripe
from utils.params import PARAMS as SYS_ARGS from utils.params import PARAMS as SYS_ARGS
from utils import void from utils import void
import json
class User :
def __init__(self,email,plans):
self.plans = plans
self.me = {"plan":[],"customer":{},"payments":[]}
self.init(email)
self.format() #-- creating properties
pass
def info(self):
return self.me['customer']
def _add_card(self,card):
try:
stripe.token.Token.create(customer=self.me["customer"]['id'],card=card)
self.init(self.me["customer"]['email'])
return 1
except Exception as error :
print (error)
return 0
def charge(self,**args) :
"""
This function will charge a user for a service (given a plan id)
:plan_id plan identifier for the current product
:card_index provide a token (otherwise will use a valid card)
"""
plan_id = args['plan_id']
if len(self.me["payments"]) == 1 :
card_index = 0
else:
card_index = args['card_index']
plan = [item[plan_id] for item in self.plans if item['id'] == plan_id]
card = {} if plan['amount'] == 0 else self.me["payments"][card_index]
params = {}
params['customer'] = self.me["customer"]['id']
params ['amount'] = plan['amount']
params ['currency'] = plan['currency']
params ['description'] = " ".join([self.product['statement_descriptor'],plan['nickname']])
if 'source' in card :
params ['source'] = card['source']
stripe.charge.Charge.create(params)
del params['amount'], params['currency']
class Store : stripe.invoice.Invoice.create(**params)
# stripe_keys = { pass
# "secret_key":SYS_ARGS['stripe']['secret'].strip(),
# "publishable_key":SYS_ARGS['stripe']['pub'].strip() def format(self):
# }
# stripe.api_key = self.stripe_keys['secret_key'] self.plan = void()
self.plan.info = lambda: self.me["plan"]
# self.user.plan.cancel = lambda id: stripe.subscription.Subscription.delete(self.customer['id'],prorate=True,invoice_now=True)
def __init__(self,**args): self.card = void ()
self.get = void() self.card.exists = lambda: len(self.me["payments"]) > 0
self.get.user = self._get_user self.card.add = self._add_card
self.get.plans = self._get_plans self.card.charge = self.charge
product_name = args['product'] self.card.html = lambda: [{} for card in self.me["payments"]]
stripe.api_key = args['secret'] if 'secret' in args else stripe.api_key self.card.delete = lambda index: stripe.customer.Customer.delete_source(self.me["customer"]['id'],self.me["payment"][index])
if stripe.api_key :
def init(self,email):
self.__init_product(product_name) #
# get customer information
# email = args['email']
# self.user_plan = []
# self.customer = {}
customers = stripe.customer.Customer.list(email=email).to_dict_recursive()['data']
if customers :
customers = customers[0]
else: else:
print (['secret' in args, stripe.api_key is None]) #
pass # The user doesn't exist, we must create the user (without payment initially)
def __init_product(self,product_name): # We can probably assign the user to a free plan if available
PRODUCT_INDEX = 2 #
DATA_TYPE_INDEX = 1 customers = stripe.customer.Customer.Create(email=email)
STATUS_INDEX = 0 free_plan = [item for item in self.plans if item['amount'] == 0]
if free_plan :
free_plan = free_plan[0]
self.plan.subscribe(free_plan['id'],email)
# self.customer = {"email":email,"id":customers['id']}
self.me["customer"] = {"email":email,"id":customers['id']}
#
# extract payments available,
if 'sources' in customers and 'data' in customers['sources'] :
if customers['sources']['data'] :
# self.payment = [ card.to_dict_recursive() for card in customers['sources']['data'] ]
self.me['payments'] = [ card.to_dict_recursive() for card in customers['sources']['data'] ]
info = customers #self.me['customer']
subscriptions = info['subscriptions']['data']
ids = [plan['id'] for plan in self.plans ]
_found = None
for sub in subscriptions :
aplan = sub['plan']
if set([aplan['id']]) & set(ids):
_found = sub.to_dict_recursive()
break
self.me["plan"] = _found if _found else []
# self.user_plan = _found if _found else None
class Store :
def __init__(self,**args):
"""
Creating an instance of a store. A store is a proxy to stripe (https://stripe.com)
:name name of the product
:secret stripe api key
"""
self.products = []
self.plans = []
self.product = {}
product_name = args['name']
stripe.api_key = args['secret'] if 'secret' in args else stripe.api_key
resp = stripe.product.Product.list().to_dict_recursive()['data'] resp = stripe.product.Product.list().to_dict_recursive()['data']
self.product = []
self.plan = []
if resp : if resp :
self.products = [item.to_dict_recursive() for item in resp]
if resp : if len(self.products) > 1 :
self.product = [p for p in resp if p['name'] == product_name] self.product = [item for item in self.products if item['name'] == product_name]
self.product = self.product[0] if self.product else [] self.product = self.product[0] if self.product else []
else: else:
self.product = resp[0] self.product = self.products[0]
#
if 'id' in self.product : # if we have a product we should get the list of plans associated
self.__init_plan(self.product['id']) #
def init(self,email):
def __init_plan(self,id):
""" """
This function will retrieve plan information associated with the given product id This function pulls a user's information, think of it like a customer walks into a store ...
:email
""" """
self.plans = stripe.plan.Plan.list(product=id).to_dict_recursive()['data'] pass
def _get_user(self,**args): class Plans(Store) :
def __init__(self,**args):
""" """
This function will return a customer information and associated plan (hopefully) This function will retrieve plan information associated with the given product id
:email
""" """
email = args['email'] Store.__init__(self,**args)
customers = stripe.customer.Customer.list(email=email).to_dict_recursive()['data'] self.plans = stripe.plan.Plan.list(product=self.product['id']).to_dict_recursive()['data']
if not customers: for item in self.plans :
return None index = self.plans.index(item)
self.plans[index] = item.to_dict_recursive()
if 'features' in self.plans[index]['metadata'] :
self.plans[index]['metadata']['features'] = json.loads(self.plans[index]['metadata']['features'])
self.plans.sort(key=lambda item: item['amount'])
if 'email' in args :
self.user = User(args['email'],self.plans)
else: else:
customers = customers[0] self.user = None
return {"email":email,"id":customers['id']} # self.plans = [item.to_dict_recursive() for item in self.plans]
self.format()
pass def format(self):
def __cancel_plan(self,**args) : self.get = void()
pass self.get.plans = lambda:self.plans
def __set_charge(self,**args): self.get.products = lambda:self.products
self.plan = void()
self.plan.upgrade = self.upgrade
self.plan.cancel = self.cancel
self.plan.subscribe = self.subscribe
def cancel(self,id) :
"""
This function will cancel a subscription given a user's email/or a subscription for the given product/user
We are assuming one subscription one plan, no complex billing is involved (not good for startups)
:email,subscription
"""
stripe.subscription.Subscription.delete(id,prorate=True,invoice_now=True)
pass pass
def __set_plan(self,**args): def upgrade(self,id,email):
""" """
This function is effectively a subscription for the current user A change of plan suggests cancelling the current plan and assigning a new one.
If it's a paid plan, the calling code must insure the user has an active token
""" """
plan = args['plan'] if not self.user and email:
email = args['email'] self.user = User(email,self.plans)
user_plan = args['user_plan'] if 'user_plan' in args else self.get.plans(email=email)
if plan == user_plan : sub_id = self.user.plan.info()['id']
# try:
# There is nothing to do here self.cancel(sub_id)
pass self.subscribe(id,email)
else:
# #
# we need to cancel, the user_plan unless it is free # We need to reload everything
self.user.init(email)
except Exception as error :
print (error) ;
return 0
return 1
pass
pass pass
def _get_plans(self,**args) : def subscribe(self,id,email):
""" """
The product name is provided in the constructor and at this poit, if a user email is not provided, Subscribe a user to a given plan assuming the user has already been initialized
This function operates as an accessor to the plans attribute. If an email is provided The responsibility falls upon the calling code to perform the cancellatioin of the existing plan and perform an upgrade
The function will return the plans associated for a user given the current product.
:email user email
""" """
if 'email' in args : if not self.user :
# self.user = User(email,self.plans)
email = args['email'] plan = {"plan": id}
info = stripe.customer.Customer.list(email=email).to_dict_recursive()['data'] amount = sum([item['amount'] for item in self.plans if item['id'] == 0])
if info : if 'email' in self.user.info() and email == self.user.info()['email'] :
# email = self.user.info()['email']
# else:
_found = None self.user.init(email)
ids = [plan['id'] for plan in self.plans ] #
subscriptions = info[0]['subscriptions']['data'] # We must insure the user is one of our customers i.e perhaps
for sub in subscriptions : if amount == 0 or self.payment :
aplan = sub['plan'] stripe.subscription.Subscription.create(
if set([aplan['id']]) & set(ids): customer=self.user.info()['id'], #if not email else email,
_found = sub items=[plan]
break )
return 1
info = _found if _found else None else:
return 0
class factory:
@staticmethod
def instance (**args) :
return Plans(**args)
# if 'email' in args :
# store = Plans(**args)
# else:
# store = Plans(**args)
# return store
# class _Store :
# # stripe_keys = {
# # "secret_key":SYS_ARGS['stripe']['secret'].strip(),
# # "publishable_key":SYS_ARGS['stripe']['pub'].strip()
# # }
# # stripe.api_key = self.stripe_keys['secret_key']
# def __init__(saelf,**args):
# self.payment = []
# self.user_plan = {}
# self.customer = {}
# self.get = void()
# # self.get.user = void()
# # self.get.user.info = self._get_user
# # self.get.user.plan = lambda: self.user_plan if self.user_plan else None
# # self.add = void()
# # self.add.user = self.init
# # self.add.card = lambda card: stripe.token.Token.create(customer=customer['id'],card=card)
# self.user = void()
# self.user.init = self.init
# self.user.info = self._get_user
# self.user.plan = void()
# self.user.plan.info = lambda: self.user_plan if self.user_plan else None
# # self.user.plan.cancel = lambda id: stripe.subscription.Subscription.delete(self.customer['id'],prorate=True,invoice_now=True)
# self.user.card = void ()
# self.user.card.exists = lambda: len(self.payment) > 0
# self.user.card.add = self._add_card
# self.user.card.charge = self._apply_charge
# self.user.card.html = lambda: [{} for card in self.payment]
# self.user.card.delete = lambda index: stripe.customer.Customer.delete_source(self.customer['id'],self.payment[index])
# self.get.plans = self._get_plans
# self.has = void ()
# product_name = args['product']
# stripe.api_key = args['secret'] if 'secret' in args else stripe.api_key
# if stripe.api_key :
# self.__init_product(product_name)
# else:
# print (['secret' in args, stripe.api_key is None])
# self.plan = void()
# self.plan.cancel = self.__cancel_plan
# self.plan.subscribe = self.__subscribe_plan
# self.plan.upgrade = self.__upgrade_plan
# def _apply_charge(self,**args) :
# """
# This function will charge a user for a service (given a plan id)
# :plan_id plan identifier for the current product
# :card_index provide a token (otherwise will use a valid card)
# """
# plan_id = args['plan_id']
# if len(self.payment) == 1 :
# card_index = 0
# else:
# card_index = args['card_index']
# plan = [item[plan_id] for item in self.plans if item['id'] == plan_id]
# card = {} if plan['amount'] == 0 else self.payment[card_index]
# params = {}
# params['customer'] = self.customer['id']
# params ['amount'] = plan['amount']
# params ['currency'] = plan['currency']
# params ['description'] = " ".join([self.product['statement_descriptor'],plan['nickname']])
# if 'source' in card :
# params ['source'] = card['source']
# stripe.charge.Charge.create(params)
# del params['amount'], params['currency']
# stripe.invoice.Invoice.create(**params )
# pass
# def __init_product(self,product_name):
# PRODUCT_INDEX = 2
# DATA_TYPE_INDEX = 1
# STATUS_INDEX = 0
# resp = stripe.product.Product.list().to_dict_recursive()['data']
# self.product = []
# self.plan = []
# if resp :
# if resp :
# self.product = [p.to_dict_recursive() for p in resp if p['name'] == product_name]
# self.product = self.product[0] if self.product else []
# else:
# self.product = resp[0]
# if 'id' in self.product :
# self.__init_plan(self.product['id'])
else: # def __init_plan(self,id):
info = None # """
return info # This function will retrieve plan information associated with the given product id
else : # """
# self.plans = stripe.plan.Plan.list(product=id).to_dict_recursive()['data']
# for item in self.plans :
# index = self.plans.index(item)
# self.plans[index] = item.to_dict_recursive()
# if 'features' in self.plans[index]['metadata'] :
# self.plans[index]['metadata']['features'] = json.loads(self.plans[index]['metadata']['features'])
# #
# # sort items by amounts
# #
# self.plans.sort(key=lambda item: item['amount'])
# # self.plans = [item.to_dict_recursive() for item in self.plans]
# def init(self,email):
# #
# # get customer information
# # email = args['email']
# self.user_plan = []
# self.customer = {}
# customers = stripe.customer.Customer.list(email=email).to_dict_recursive()['data']
# if customers :
# customers = customers[0]
# else:
# #
# # The user doesn't exist, we must create the user (without payment initially)
# # We can probably assign the user to a free plan if available
# #
# customers = stripe.customer.Customer.Create(email=email)
# free_plan = [item for item in self.plans if item['amount'] == 0]
# if free_plan :
# free_plan = free_plan[0]
# self.plan.subscribe(free_plan['id'],email)
# self.customer = {"email":email,"id":customers['id']}
# #
# # extract payments available,
# if 'sources' in customers and 'data' in customers['sources'] :
# if customers['sources']['data'] :
# self.payment = [ card.to_dict_recursive() for card in customers['sources']['data'] ]
# #
# # user plans
# info = customers
# subscriptions = info['subscriptions']['data']
# ids = [plan['id'] for plan in self.plans ]
# _found = None
# for sub in subscriptions :
# aplan = sub['plan']
return self.plans # if set([aplan['id']]) & set(ids):
# _found = sub.to_dict_recursive()
# break
# self.user_plan = _found if _found else None
# def _get_user(self,**args):
# """
# This function will return a customer information and associated plan (hopefully)
# :email
# """
# return self.customer
# def __cancel_plan(self,id) :
# """
# This function will cancel a subscription given a user's email/or a subscription for the given product/user
# We are assuming one subscription one plan, no complex billing is involved (not good for startups)
# :email,subscription
# """
# stripe.subscription.Subscription.delete(id,prorate=True,invoice_now=True)
# pass
# def __upgrade_plan(self,id,email=None):
# """
# A change of plan suggests cancelling the current plan and assigning a new one.
# If it's a paid plan, the calling code must insure the user has an active token
# """
# email = self.customer['email'] if email is None else email
# sub_id = self.user.plan.info()['id']
# try:
# self.plan.cancel(sub_id)
# self.plan.subscribe(id,email)
# #
# # We need to reload everything
# self.user.init(email)
# except Exception as error :
# print (error) ;
# return 0
# return 1
# pass
# def __subscribe_plan(self,id,email):
# """
# Subscribe a user to a given plan assuming the user has already been initialized
# The responsibility falls upon the calling code to perform the cancellatioin of the existing plan and perform an upgrade
# """
# plan = {"plan": id}
# amount = sum([item['amount'] for item in self.plans if item['id'] == 0])
# if 'email' in self.customer and email == self.customer['email'] :
# email = self.customer['email']
# else:
# self.init(email)
# #
# # We must insure the user is one of our customers i.e perhaps
# if amount == 0 or self.payment :
# stripe.subscription.Subscription.create(
# customer=self.customer['id'], #if not email else email,
# items=[plan]
# )
# return 1
# else:
# return 0
# def _add_card(self,card):
# try:
# stripe.token.Token.create(customer=customer['id'],card=card)
# self.user.init(self.customer['email'])
# return 1
# except Exception as error :
# print (error)
# return 0
# def _get_plans(self,**args) :
# """
# This function will provide an plan for a given user given an email or the list of plans
# :email user email
# """
# return self.user_plan if 'email' in args else self.plans

Loading…
Cancel
Save