From 0b15351b8efc9612e1897086469c090214c8effa Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Mon, 16 Sep 2019 23:08:43 -0500 Subject: [PATCH] data transport framework for rabbitmq, mongodb, couchdb, ... --- setup.py | 19 ++++ transport/__init__.py | 209 +++++++++++++++++++++++++++++++++++++++++ transport/__init__.pyc | Bin 0 -> 2005 bytes transport/common.py | 154 ++++++++++++++++++++++++++++++ transport/common.pyc | Bin 0 -> 4344 bytes transport/couch.py | 199 +++++++++++++++++++++++++++++++++++++++ transport/couch.pyc | Bin 0 -> 5405 bytes transport/couchdb.pyc | Bin 0 -> 5447 bytes transport/disk.py | 82 ++++++++++++++++ transport/disk.pyc | Bin 0 -> 2855 bytes transport/mongo.py | 66 +++++++++++++ transport/mongo.pyc | Bin 0 -> 3162 bytes transport/queue.py | 200 +++++++++++++++++++++++++++++++++++++++ transport/queue.pyc | Bin 0 -> 6988 bytes transport/s3.py | 83 ++++++++++++++++ transport/s3.pyc | Bin 0 -> 3141 bytes transport/session.py | 66 +++++++++++++ 17 files changed, 1078 insertions(+) create mode 100644 setup.py create mode 100644 transport/__init__.py create mode 100644 transport/__init__.pyc create mode 100644 transport/common.py create mode 100644 transport/common.pyc create mode 100644 transport/couch.py create mode 100644 transport/couch.pyc create mode 100644 transport/couchdb.pyc create mode 100644 transport/disk.py create mode 100644 transport/disk.pyc create mode 100644 transport/mongo.py create mode 100644 transport/mongo.pyc create mode 100644 transport/queue.py create mode 100644 transport/queue.pyc create mode 100644 transport/s3.py create mode 100644 transport/s3.pyc create mode 100644 transport/session.py diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..acd5d78 --- /dev/null +++ b/setup.py @@ -0,0 +1,19 @@ +""" +This is a build file for the +""" +from setuptools import setup, find_packages + +setup( + name = "data-transport", + version = "1.0", + author = "The Phi Technology LLC", + author_email = "steve@the-phi.com", + license = "MIT", + packages=['transport'], + install_requires = ['pymongo','numpy','cloudant','pika','boto','flask-session','smart_open'], + + use_2to3=True, + convert_2to3_doctests=['src/your/module/README.txt'], + use_2to3_fixers=['your.fixers'], + use_2to3_exclude_fixers=['lib2to3.fixes.fix_import'], + ) diff --git a/transport/__init__.py b/transport/__init__.py new file mode 100644 index 0000000..9b4f540 --- /dev/null +++ b/transport/__init__.py @@ -0,0 +1,209 @@ +""" +Data Transport - 1.0 +Steve L. Nyemba, The Phi Technology LLC + +This module is designed to serve as a wrapper to a set of supported data stores : + - couchdb + - mongodb + - Files (character delimited) + - Queues (RabbmitMq) + - Session (Flask) + - s3 +The supported operations are read/write and providing meta data to the calling code +Requirements : + pymongo + boto + couldant +The configuration for the data-store is as follows : + couchdb: + { + args:{ + url:, + username:, + password:, + dbname:, + uid: + } + } + RabbitMQ: + { + + } + Mongodb: + { + args:{ + host:, #localhost:27017 + username:, + password:, + dbname:, + uid:s + + } + } +""" +__author__ = 'The Phi Technology' +import numpy as np +import json +import importlib +from common import Reader, Writer #, factory +# import disk +# import queue +# import couch +# import mongo +# import s3 +class factory : + @staticmethod + def instance(**args): + """ + This class will create an instance of a transport when providing + :type name of the type we are trying to create + :args The arguments needed to create the instance + """ + source = args['type'] + params = args['args'] + anObject = None + + if source in ['HttpRequestReader','HttpSessionWriter']: + # + # @TODO: Make sure objects are serializable, be smart about them !! + # + aClassName = ''.join([source,'(**params)']) + + + else: + + stream = json.dumps(params) + aClassName = ''.join([source,'(**',stream,')']) + try: + anObject = eval( aClassName) + #setattr(anObject,'name',source) + except Exception,e: + print ['Error ',e] + return anObject + +# class Reader: +# def __init__(self): +# self.nrows = 0 +# self.xchar = None + +# def row_count(self): +# content = self.read() +# return np.sum([1 for row in content]) +# def delimiter(self,sample): +# """ +# This function determines the most common delimiter from a subset of possible delimiters. +# It uses a statistical approach (distribution) to guage the distribution of columns for a given delimiter + +# :sample sample string/content expecting matrix i.e list of rows +# """ + +# m = {',':[],'\t':[],'|':[],'\x3A':[]} +# delim = m.keys() +# for row in sample: +# for xchar in delim: +# if row.split(xchar) > 1: +# m[xchar].append(len(row.split(xchar))) +# else: +# m[xchar].append(0) + + + +# # +# # The delimiter with the smallest variance, provided the mean is greater than 1 +# # This would be troublesome if there many broken records sampled +# # +# m = {id: np.var(m[id]) for id in m.keys() if m[id] != [] and int(np.mean(m[id]))>1} +# index = m.values().index( min(m.values())) +# xchar = m.keys()[index] + +# return xchar +# def col_count(self,sample): +# """ +# This function retirms the number of columns of a given sample +# @pre self.xchar is not None +# """ + +# m = {} +# i = 0 + +# for row in sample: +# row = self.format(row) +# id = str(len(row)) +# #id = str(len(row.split(self.xchar))) + +# if id not in m: +# m[id] = 0 +# m[id] = m[id] + 1 + +# index = m.values().index( max(m.values()) ) +# ncols = int(m.keys()[index]) + + +# return ncols; +# def format (self,row): +# """ +# This function will clean records of a given row by removing non-ascii characters +# @pre self.xchar is not None +# """ + +# if isinstance(row,list) == False: +# # +# # We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary) +# cols = self.split(row) +# #cols = row.split(self.xchar) +# else: +# cols = row ; +# return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols] + +# def split (self,row): +# """ +# This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes. +# @pre : self.xchar is not None +# """ + +# pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"]) +# return re.findall(pattern,row.replace('\n','')) + + +# class Writer: + +# def format(self,row,xchar): +# if xchar is not None and isinstance(row,list): +# return xchar.join(row)+'\n' +# elif xchar is None and isinstance(row,dict): +# row = json.dumps(row) +# return row +# """ +# It is important to be able to archive data so as to insure that growth is controlled +# Nothing in nature grows indefinitely neither should data being handled. +# """ +# def archive(self): +# pass +# def flush(self): +# pass + +# class factory : +# @staticmethod +# def instance(**args): + +# source = args['type'] +# params = args['args'] +# anObject = None + +# if source in ['HttpRequestReader','HttpSessionWriter']: +# # +# # @TODO: Make sure objects are serializable, be smart about them !! +# # +# aClassName = ''.join([source,'(**params)']) + + +# else: + +# stream = json.dumps(params) +# aClassName = ''.join([source,'(**',stream,')']) +# try: +# anObject = eval( aClassName) +# #setattr(anObject,'name',source) +# except Exception,e: +# print ['Error ',e] +# return anObject \ No newline at end of file diff --git a/transport/__init__.pyc b/transport/__init__.pyc new file mode 100644 index 0000000000000000000000000000000000000000..0311a202464c6bf90217bb05bf7320139c170dbd GIT binary patch literal 2005 zcma)7QEwYX5T3IgJH7^`AOWqA&`P{Gp-Eeih!jN;qO?>Z4b+L0VyQZ6=&cemHw zy%-ByLMlIk7ybeNju+nGo4s>d1thp=H?y;|GvCh4w{HLOPFSA({4%EIYvccy81^Nm zglL30piw|ci;^~FEgH2~a)(BpmApoyH7|E4>r&pMQIGO<8m&{1V53J#mwq97P4vU4 z&z?>O>m26K7E$=MbW)reS=dq=Cw9cc-3Q@`Q&&nH?TX{M%E$7aIGrl-{WKM)DxMZP z)04S4I(ig_r_(!GqgXJKeN+V+jw~5NqJcX;R#{PMwDmEF)@faJ= zUadAxl(nfY#NbIL?d3{qKgT`PTZlBkN(V!LH%b^KllwD+2tpQ#D2={KleCzKTmiX< zkDEETh-H@XMywMR4%MqFH7ZwyKCYN0vT#0jtc&w>Qq@t#xkj$w zj$QBg%y4p%%5$A%dgi?~8G_&bPZ)h^CU)OT{mNwf2bd1;@kYNwB?_4<=o+cnD5bSC zZIbnx3mZ5zQ4ESePr+?u)9)K9+x|oJ2 zy)s@r;|4IU2kklfPMdiQ(SL0I6^8u~lLB!AQlKn()`7uo8lDUS?D-SyVGZM34Er0V zh3EO*W#FY2-Cg2vNXs@Y!2Mfv*(GyGpz;MQpy|@2MXv)|cBt4U*P;tp)*8_L+N?(l zEZ1q$X^iK$$#rN^FWYohd`um;MwcO(Uny92@m%z2;k|$Jvc}s4CDgWoS8F$ zz^B?bAKL;$3MoZU{_XGCRV-$YbcH%Z&`ddzEH7qj8Z=0k{lzY z-po3bCHFwp0Ndmz`#z-W!rYIdv`AeP?Ur-zaFx70SlK&#suu=f&~0sW?yPMF9|i5; zc55@(3)wkgw^v z7e#o1u4Bpy0SuO$)B;JJgAS0&;VnMEc};WvoypR%&rPgzyuw51X~>oI(f&6!pQ(fT UqKBX2xaGTcgKn_l|32vc4Nc 1: + m[xchar].append(len(row.split(xchar))) + else: + m[xchar].append(0) + + + + # + # The delimiter with the smallest variance, provided the mean is greater than 1 + # This would be troublesome if there many broken records sampled + # + m = {id: np.var(m[id]) for id in m.keys() if m[id] != [] and int(np.mean(m[id]))>1} + index = m.values().index( min(m.values())) + xchar = m.keys()[index] + + return xchar + def col_count(self,sample): + """ + This function retirms the number of columns of a given sample + @pre self.xchar is not None + """ + + m = {} + i = 0 + + for row in sample: + row = self.format(row) + id = str(len(row)) + #id = str(len(row.split(self.xchar))) + + if id not in m: + m[id] = 0 + m[id] = m[id] + 1 + + index = m.values().index( max(m.values()) ) + ncols = int(m.keys()[index]) + + + return ncols; + def format (self,row): + """ + This function will clean records of a given row by removing non-ascii characters + @pre self.xchar is not None + """ + + if isinstance(row,list) == False: + # + # We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary) + cols = self.split(row) + #cols = row.split(self.xchar) + else: + cols = row ; + return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols] + + def split (self,row): + """ + This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes. + @pre : self.xchar is not None + """ + + pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"]) + return re.findall(pattern,row.replace('\n','')) + + +class Writer: + + def format(self,row,xchar): + if xchar is not None and isinstance(row,list): + return xchar.join(row)+'\n' + elif xchar is None and isinstance(row,dict): + row = json.dumps(row) + return row + """ + It is important to be able to archive data so as to insure that growth is controlled + Nothing in nature grows indefinitely neither should data being handled. + """ + def archive(self): + pass + def flush(self): + pass + +# class factory : +# @staticmethod +# def instance(**args): +# """ +# This class will create an instance of a transport when providing +# :type name of the type we are trying to create +# :args The arguments needed to create the instance +# """ +# source = args['type'] +# params = args['args'] +# anObject = None + +# if source in ['HttpRequestReader','HttpSessionWriter']: +# # +# # @TODO: Make sure objects are serializable, be smart about them !! +# # +# aClassName = ''.join([source,'(**params)']) + + +# else: + +# stream = json.dumps(params) +# aClassName = ''.join([source,'(**',stream,')']) +# try: +# anObject = eval( aClassName) +# #setattr(anObject,'name',source) +# except Exception,e: +# print ['Error ',e] +# return anObject \ No newline at end of file diff --git a/transport/common.pyc b/transport/common.pyc new file mode 100644 index 0000000000000000000000000000000000000000..91a168c9e500c597f44fd70800f6455024b930e2 GIT binary patch literal 4344 zcmcInYi}G$6|J79XEq^_Y!b{asN+TKIN08x>@Hx15Ury~h+v>?u-1`1^mLa!ZTHJv zHGYV(SSc$WpZN!fU%^k}D?b3vxivE$cCoTw7`yM(<95}pI``arr~a>--M{_zv%N@7 zK6QLQ!((pw)&IfTqwbc~r`mtH=5Jo;Y!I&a+e7XD zYMA)jIvVChS{z>ZFJEqSyW7LW_*oH;QthJ^Ym*%2I`(bh8(m?I(Dq9tNiQq%!(!HZk)+VTPBa3BYohDg= zrIsi9zaHrkRf2FZz{D?3rW0G*n54-4-itIe$J0^s80XM)78Niyv`~ccmG&zg#vhzj z*uf9;*e|Q%G>Mb^(9bk17xQs4izT8kO*s)2vF--?WRz4o)4AorI^~6&quUu2w&-?n z#54|b+wGd$K;z3doctS~P20j`01I>#p(P*Srar-AoTWA}usl1LS zCUQRxh>0s`z^o(j^-E7#PaW0NAqGA5eD8+3l)Koksgp(Z+M5|pi=5$HiO^b}C@(En zH=|4@15VmDL{#J!2+Ax@`4!I4!CTPm1K`|#hl{#H-0&jq#0%#m+x-)bq(Q53+aL3* z=DzCgtD!#ri&B4{9kZs+hwgyCQHtf%)gR|fcn*z;N!SU7H5FWx7E0GGd)d6Q zdE8Wsw5|aN7I0kiHX!JBIvt4}2ctY9(?DWSIZJZzmQ2bDV?iVO#f;yTe^3>fAA+fa zNfapy5PE>%GmQb2S@XI0&#XT(SPRM)R5msNH+_VmstBW@--|I+C4&+7US?oA9EFEk zww|A2q>74klpz!x6cv^`OiuNjN!*Lix(Txqc1*soYlT1n&ld0T^?9jrPR5K7Gv|J? zru`H)a08MDYruh(S}~NaJ(l)*!Vw`{#Mab@NFnnZHw7N-{ak?rgxgb#aHQk9s#?mz zi-i}H2?tM%X?O>wF;}0yggLzXR}iKJsAN-$o=~CAjdB~b@hybTG-4M=PeF6qBs5xk zBwtDFn9ujK9)!A%@rIe`Nt{F$V3$vSL{+ZVQN7t#$NIuZOfhAeSP6*CCwVOUr(h@f zbQ-1z`l6TQu|AhcIFv*<$7iZJfJ9AvCZGWU4-zawh_PaPAQyaHpd!gouYM0(5wj8`S+f1Q2y0iag$Rqz;6~!gj}F5C<6HPF)ue zfm&b`SbyAB)s_Tl%r@1N9MDT^5i#VxYd$%-qjt~)$ghQ=IQOjUeBlya(2~J6b;d2V z$X(iM+=dtMUQ7r9|NL*hR@x?2=E$0lvH|k&99PhsQpH_3-hWmimog8mNp4`hkQdfx z#=xvNxVXAR34s)h16>42@F}BX2{ zO)<}X2*7wUa|(eg{Q_cVNt*gm3f`ikiHa&VL~IHZf{;JBz)Xhf25~Ac^3~8pN#f56 zEe5GKplEJk>sR|~^$#oEm76PXtt_nU;owU7BA0L#6Pzr0mzX3+e9j{+F@*t9nE4`1 zjSlWYL&6Dst&TRrzRWVVmXe$+U8Z5AB}6&kxvH#1bmc8H-i&g+?AdfehK!GC62IojZD5C@u5CIeZUCCwsft9|D5eiQ_ zz2q$hJBN^iONvh%of#5^w6!`bEt}w#E4G<95v27YYSjzA$^ofQ&k{RKa>+VjB!vkv zW=0d0V#FMJGAgV#YcrJBuV-uUJ(zFkY`veZ?~e`Xx7=Go=hgnw-uK5#OUnGuPkBzTMB+Z69n&?UqNueOxdb*U_XzG7rDHy~PGr_rf* z-tFA)+>&6n!ygd;qlV{R`b{-F3UyvdL@|9f&^CJzaI+X0y`!oXMj6H$@icT@uy;`g zetUmwqPw7~Pp7&PKiF+Dz&Kr$63i#FvU!3ZaR`{6gZC+T9l;|A!IVkdfoo9p@J8cN zR+=whDX3rF=(U&0+b{4)fuOv1yl>#S?S2O^pzx6!V7SHcUB$BLR_3I*&zmIOnAUzsAuji0w$>r8jF;B6SM?EeGdzZ2s#QZh zTAO=y0fTbLem^dvnd^rmJ1i=6X%`ZBc_Fo3@I$II=Or0`pQp({{5s$!bE~3veahsP VPaL9blh|_Mrf%!nfny5|g1AD$6>3yKx}eCBmaAQI z&6(9lA-~jlOt5hC{*I=-5!?9oRvG7JQk1sc zv-&`XPuIiEV?Ex9Z-rX}9ezH@!Y!Q)@?uo%9k<(CgUp1x*+_@D$KhcaPbRtyzeAk zz!B;k*jitLCl9$Dglwc|fCK3S^_x>n=exo3@4)R&n<%6(Beb+ZKml2kKWICb2Px zMVZRvewKDGVMatV%WyJ7B2F5dU;FrB5Kt#Yab zfhkQZV$&j-j&*M3VWWB=cJ)zatg+lhQtH_1{(LTFnfO49(9I&(U7|srj2`Vu2Y@jV z&mYMcdsyGaO-W;}qGv1by(6x-J~@_s^!r(!*?yl>%o>+?ZEpqFP4A}vj<@39@K)=q z-mR-^Y zIGDH`PY7ple!DoR^QH_giNBhUOf4BW4i7SY$f5h7tyG(A zFApmehJE+q&JliLcTyGyS*pX`>1gCg>oBu}YEn1>7iLEwFvz0NzR;YITd@mf*Yc?_ zTi2n?+1JMbF0%WMMnKRhyNDbhahVQZv+x^C27X|S6chpd5)FHZCX@aM;b=fK0x#t8Fq6R0$s8^?k~hP$3YD zZeq1SQJ6!bhB+T>L=Xl23U@nzM0XCh7lMxb&6#d--{xG1dZUKyzhZ!PrQA`Yh z{AlZb_;CW_- z_sUO*R~inYpiw-~gaT)P`eEK=4&a`9vgJ+LE{<@}C83;YMHi@fBw3JViCr97nDpG^ zM&zJB;hF_$hUYdg+V-2CBYQehbeqjO8{tRlI{F10E?;~^dS`jU9ZY?S%aDDWb&%vw zg7$J_L6fcsNiZruD*G`mapiMp(p4XzNnuCkbOzdIs3eN0(JGOzBe(0=n8VF?$jzDF z{L^i_;w^CI*;Uhg(re2mBonjJtD?1_>by#R)x`7atH4!{67K{Fr{Kouy+FcQARv7H zhRe{pswG}UbJ6RB`u~B>nO^1cUGXZg%zjIiPlYH@M=e!V5TM|>DhQ!Lq6eY_t33D} zlEw~;C_Et3M!k2e_9O+wn^`i5UI!U9>MS^^$Luf=$3+`H`R zvq#zI8d~-u8u}Ne3mQ~w)w}Aw4SFHLtROp+K)^V~jX&YMZ_s=n>w9F(nmW4cg5dkJ zk{4+^ttxL}LDm4cJ%AN)d-(jW!rwC>2WY_%Sr~;k>TM~uU!umw8>uN!V`CA3)H#vF zYtPL_p#VTE&oFl)|0bfKx4qtA%0*>GiDbYvhbs76#8&jA8 zL^Q306ABPBrql7nmFmWpk|zPw+GZ#z`FUHUu`3*g1^)3w>67VqG7G_zCiM{w4pazgFvbn`ehI+Q=mk zpQ5+;tn|)evHO_%XIw7dXDDk2*WFriw{;4Y-XruRAxetzxX2?ss)z}i<50%Lh|<2_ bhei1hT;1{!6<`R9CQiGI;`-XhoyY$NSuH&Z literal 0 HcmV?d00001 diff --git a/transport/couchdb.pyc b/transport/couchdb.pyc new file mode 100644 index 0000000000000000000000000000000000000000..2639714fe507bd151eb2901faf2dcaf12053df1d GIT binary patch literal 5447 zcmb7ITW=gm6|U~NczR;zo+X2`+7>IJ$!_96v;+|)kWDOBs|^<0V_{)w8MV91o*wsf zPr7P+LCy==*99baKuA3D19;|@f5Z=f?>jXYv$7L2a#dH?<#bh@?|kPR*Z#fIPX7Mr zt3*wIH9Vi8nAfOMrM6LfD)m+Fscke|+gIDZN^2^utGuR0b+uhrqlVgUC@*cO^pZ-O z>T9LmDD~^@rb?HTZYo_@yI9nk4=kxPkb#B_wC4jYey`VE<~II}s=XfD`2J=U7iL^m zw%xP(K!-2a!i{4++KKOln*$yGa*&0aIvEsYUhW;Y+na;Tgu7X;Lp4%1JUvcfyIuioNjj=f4PF)P$? zKlp8A9=6x@QQ0P{NXMzJq`6gPSY{jOeZgG%gr7iW)cL@dCDSslGJo zykPb;RMhL%xuihV4uWUnxQa)?q{@O|P#PP4KjgiHmEP5rE)soePL>A2#Ngmrijy3a z`GHQ`i}7@)h(|gIQrz%PY;?%iuxsOLPusigAmMg-2$nBU%r*3!2oO52eQEisbD~t= zb=OqqwcrAC_Lo%kr2-)4ZR@EaPTEu_KEUB|SxprmD%Vf3bqK?wp*qu9{&W`R4r^*y zC#bxWnwq{h_`cs%Rh{!V1Hv&{Q+7$ck!k&w$Pd)8Evs5m1D(ElT~{Y{HM~S2W6D>$gsbUeEmdRwK3o9?{vln7mA7#cG%UvXuj;-#`=Ter553~r~C=1;sD&)=R zd{-I(j)`~=k&Lm0^<6xaROTjHc4o>CrgpeCK9-&I`&p6Me*ZcqnLAwMwY?RT+um*e zLvO{u<*n9Ny=Cu?chwhc@&ha{P|R;oIlT=%swx9CYU(wV=_%{0eP5Z6WkO9YR}a|L z!={G~0ga%CtAQIJ1K9S}uqDXYc2ff=n)a~KZKICJPqJ%hG^PQ`9sfMejgGEyg3RJk zq8qGueG!izT}9R9uA|$i0Dh&$(RbO2@~(TL#xn!eq{knjX)be(cguUvt2b6_%U<1+ zi{aN;#AyG3hQ-mu{CGwfhO^?{~Ywk6A8oUkwSJ}KT8 zV}_|?(4HP_Vi3x}6~Z3zsU_8c9bwrw07Of5dffnfGXF#cHVU>hm|2JVNGB6ZN{+*W zOdoRSF(@q6Cfh5*84|<3>v(X4U)UX2pfV>f2lhMg;wV z^_|}b-=U|e==pthbb~y${?#P~E(C-9%6s*R+Dhs6oi~`9t=N|%dvO5#uBF-19v8O} z@sy`bQ>>Cgro(i?z`Lvcb#U5T^w3ee}Mx>KgQDEp-lZYJ$4&NZTn5n@&A1cMr*7duoA-LeMhue zS-$v;w9dDPA7I5x6hlI8*1?+J3);(#1#h}6CBdltc;U}c#J1nTn=U;8Zwg&9*fY5C zIb9|>;#PC0~r^dAzZe1W$mqwi&Wbe%_X8>@tdBiGNX1`grm`QHCH&ViRVW z(E?GQ@HIpwfksgtia%rUb`gkC;`Ac_fu^Bw)mR0*n*ItPcGVMD&Ug + @param row row to be written + """ + + label = params['label'] + row = params['row'] + xchar = None + if 'xchar' is not None: + xchar = params['xchar'] + path = ''.join([self.path,os.sep,label]) + if os.path.exists(path) == False: + os.mkdir(path) ; + path = ''.join([path,os.sep,self.name]) + f = open(path,'a') + row = self.format(row,xchar); + f.write(row) + f.close() + \ No newline at end of file diff --git a/transport/disk.pyc b/transport/disk.pyc new file mode 100644 index 0000000000000000000000000000000000000000..3c23d49b4548c10b6baf227265f73964cceca2ce GIT binary patch literal 2855 zcma)8ZEqVz5S~51BaaJNI@? zQ{+fUZ9nnz_#u4Y2f*{pUgAJYgXJCX?aj{4&OGysTYs+h{&@83lc6-97JhGIxSt?$ ze2t7nHZX}KZ^_umB$ACtCN0@$iD@+2veA~jEjE&ok-Q@>MP7*fxY3ckE4HPpONXmn znqBWNJIe~YWx8`{qJKP0b>BSGW3zrPXnR5vyxpl=>Y4gNaNew@eJo8yHs;fzo z!-1qfu7;T}sxrZMJFB5k7eCv}z4-RDw#hfJ!EMj`my??}lLz(8_TnM0M_)O<$8Z-Q zyvUA`IU;4m%;C9_b9frS`v}otCfs$1UL3=AR%erVn)&TG+jP}<=54~Kq#7lD+a{x8 zZ0Yi*Rj;wfx3QNhYicURlX!-FFeu8x4+beu2tlW+v*VG5T#s^nEQt#DE@r;YN;j=) z{{^FPZTdXro)Z|36X~oPz2(78@A1uh5aE_P5ig4C;;!p`Dh01RBT_kaMGv=oh4W4| zQs(G5H~bQmnvnB2M}uM%RzuEdgcluRxJM9hD7b=nzldaC!@b{>rO#P_q6Gn)u?Ay zOxwh!&&qB9SY{JD7-&@w29qkEjrqt_4hDHO1U6|djVXw}#p!QD&u~fC8E9A)`7RqftFy^&UewAe3aXS@U!?CXT*5pA z-~ceEqPUSC2VNlzncsWVx(N|(h4}2oQhh1_Ja3D7J-F?@ax3m1bWrDrS#^U&zOMec z-(cqTYRxSg&ZH=j%OY^=VQ)}kPexUp*gZR(g)UGhW#wt6xMq{WqpttGAAMn1m3cvZ z0A!R*tWTzOanueBtc6nKP16r44nwRF4=m1LYh_MVK0P+Htf8+l91|hkOXg$MC$ph)SJQdN*`^)GH>UWPVDtHAUG3U(J}UO`;C$xY zts}ai%NV0+$2`c}67RJ>)KE%-)W^Lc{zIT8HCfHp`m300{*{k+sv6=Jl583f_c5CVaFc)AO9sjr0==Mcq4?9Sf+>pmYnveFPy( zJ<|itub5SH!JIXxAWxdpcv8AgPeUbTUryhrAm9S4)5_u~o&5(lu%TSS2&~=$tFOdy sxQ!Nw+QfTFN$DYIt}0WqMas?p>lkfJH$p4AMMHYriaBFeqB9r&1` 0 : + collection.delete({_id:self.uid}) + + collecton.update_one({"_id":self.uid},document,True) + diff --git a/transport/mongo.pyc b/transport/mongo.pyc new file mode 100644 index 0000000000000000000000000000000000000000..0f52768910b05d7b3b6e683ec556283da6b90e73 GIT binary patch literal 3162 zcmc&$U2oh(6dl{Un-7wcM|xl$8P9eT>+*jKr)CV@H()TE~ld(GQG<$ZP7S7}eBn=0+AbEVFddVDxg=|CC$ z2WsrAbgfmasdQZwJyC45iglF+YBYQc=VyQ88G7hB4j+}p({eA*OyPz;Uk)Hb6Q`z< z?)O#ZOttU&^SP~8+~Ogc)euI}8TfJtgFK!KU*H!zJr$0I#K3+5v8nMqjBS?a3C~WC z^tdh(mz9OpabZAK%awW16A$Q&$jO{ z3EG}a?Wvtc2e|_<0pXb!yih9gRI#Nn7uYsK?!2aUIFOwIstmBTQYW5BY(hOkvjJc? zY1AJOVlO@9^L2Vu#1kX^Gnod+gS>{vvFps#D#b}{mur%`s<1|LqjrVs!7Umj9jY9| zFkdKx`N?h@8lL>fB*$@)*@@+w(O&rA(XG+94{m*a=dNQQ>nxS*Y5>|FGP^>wCjAmg z$8du$iL7H{rbm)PVudyNSUPd_)P^*0h}H;u89i6Ug(Y~mCC5IUJ{LQqC@V4-McmoG z%_VD_{&jD|+r-~Rf6L$Yw!JpnXy_hiK7e548|kSNpSgm3dpXaAE-#eYIqA*Sl0D^p zX5mOEH}jNEg=C+*0+DCdMf2!J45k&|-PO4=Df8S2MH;mr5fWNQYD`7`@Oli^Fgddo z*!}zlx|Up5-gWOoPa-hGh!#YVqmfZGDbqUVkZYqTEt4n;FQOk3zwSust-wAck_@pN zI0y#b#?`L)G{q)?S{}w)%E~9`tST$u@HxsS*w>R1(`Sj16Q)=txqEP&S)Jrab$y!U zxu(~8T$K|Yw}pqGYwJo>9kQ)5C_IIbEh!5}Du8>jhST+gqhV1xjJt5`%2ARrv^zkP z`b2C{QJPwTa`Jmq=;ukmxdMtHDRoz(8D4;*!*ms#g#TkS%YBq56+M}s&)+>NKY&=w zT2@LVB9b>lt@GN9q|xCP1!*C)8fQhikYgm6&3-nL5}os}HB|N@?z`N8yD0Bc!!KIN z)D^U-K`9>z5cUA0w)#jE=l6p~L{nq4rv+{YS8DT;(hiAe20znRz>nUrrt!e>kvW= ze=F#y(OkX>bn+0ehiH~~uA;`d+_)qNI>HOWYex{T^5PXZPkGf7*Nz}vut6esoE6Z` zE7kv&`giCL!>E>PwdB%Qc>V8Le}?%>niKg{;5P)=!0)go0r%yGs>;(Lfm>h(3t~Nm z`z9)jh74K3`;fzqE<)n)JxuPBU`71pt4d)164RS<>|erN)Uuf->{0gwV#)fJ&F~98 zJnO0Ip71+RS#vFUB=cSRR)E)ibu#EQ_++|^x`e8J7R-DPaKb)H1Lm?DW$BPvxc~sG zQ&QFiZmU^EY%Xz=8)w3sT*(qLOPaC>HDA$%gQ{LYNbZ^W7H^0YO|r(_;}6YZCAsXq z>#yeGN0=0nDP0((w;~oHZ^X~g7Pz?4Z`RT)xd5ln<$Ge`Q>C2XO-N>jOiA(f#NrhV h4#{uX_|0q(VD2?n?l;`iGUqP&*Ssz7%8i{1{{RWISJwal literal 0 HcmV?d00001 diff --git a/transport/queue.py b/transport/queue.py new file mode 100644 index 0000000..846f591 --- /dev/null +++ b/transport/queue.py @@ -0,0 +1,200 @@ +import pika +from datetime import datetime +import re +import json +import os +from common import Reader, Writer +import json + +class MessageQueue: + """ + This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq) + :host + :uid identifier of the exchange + :qid identifier of the queue + """ + def __init__(self,**params): + self.host= params['host'] + self.uid = params['uid'] + self.qid = params['qid'] + + def isready(self): + #self.init() + resp = self.connection is not None and self.connection.is_open + self.close() + return resp + def close(self): + if self.connection.is_closed == False : + self.channel.close() + self.connection.close() + +class QueueWriter(MessageQueue,Writer): + """ + This class is designed to publish content to an AMQP (Rabbitmq) + The class will rely on pika to implement this functionality + + We will publish information to a given queue for a given exchange + """ + def __init__(self,**params): + #self.host= params['host'] + #self.uid = params['uid'] + #self.qid = params['queue'] + MessageQueue.__init__(self,**params); + + + def init(self,label=None): + properties = pika.ConnectionParameters(host=self.host) + self.connection = pika.BlockingConnection(properties) + self.channel = self.connection.channel() + self.info = self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True) + if label is None: + self.qhandler = self.channel.queue_declare(queue=self.qid,durable=True) + else: + self.qhandler = self.channel.queue_declare(queue=label,durable=True) + + self.channel.queue_bind(exchange=self.uid,queue=self.qhandler.method.queue) + + + + """ + This function writes a stream of data to the a given queue + @param object object to be written (will be converted to JSON) + @TODO: make this less chatty + """ + def write(self,**params): + xchar = None + if 'xchar' in params: + xchar = params['xchar'] + object = self.format(params['row'],xchar) + + label = params['label'] + self.init(label) + _mode = 2 + if isinstance(object,str): + stream = object + _type = 'text/plain' + else: + stream = json.dumps(object) + if 'type' in params : + _type = params['type'] + else: + _type = 'application/json' + + self.channel.basic_publish( + exchange=self.uid, + routing_key=label, + body=stream, + properties=pika.BasicProperties(content_type=_type,delivery_mode=_mode) + ); + self.close() + + def flush(self,label): + self.init(label) + _mode = 1 #-- Non persistent + self.channel.queue_delete( queue=label); + self.close() + +class QueueReader(MessageQueue,Reader): + """ + This class will read from a queue provided an exchange, queue and host + @TODO: Account for security and virtualhosts + """ + + def __init__(self,**params): + """ + @param host host + @param uid exchange identifier + @param qid queue identifier + """ + + #self.host= params['host'] + #self.uid = params['uid'] + #self.qid = params['qid'] + MessageQueue.__init__(self,**params); + if 'durable' in params : + self.durable = True + else: + self.durable = False + self.size = -1 + self.data = {} + def init(self,qid): + + properties = pika.ConnectionParameters(host=self.host) + self.connection = pika.BlockingConnection(properties) + self.channel = self.connection.channel() + self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True) + + self.info = self.channel.queue_declare(queue=qid,durable=True) + + + def callback(self,channel,method,header,stream): + """ + This is the callback function designed to process the data stream from the queue + + """ + + r = [] + if re.match("^\{|\[",stream) is not None: + r = json.loads(stream) + else: + + r = stream + + qid = self.info.method.queue + if qid not in self.data : + self.data[qid] = [] + + self.data[qid].append(r) + # + # We stop reading when the all the messages of the queue are staked + # + if self.size == len(self.data[qid]) or len(self.data[qid]) == self.info.method.message_count: + self.close() + + def read(self,size=-1): + """ + This function will read, the first message from a queue + @TODO: + Implement channel.basic_get in order to retrieve a single message at a time + Have the number of messages retrieved be specified by size (parameter) + """ + r = {} + self.size = size + # + # We enabled the reader to be able to read from several queues (sequentially for now) + # The qid parameter will be an array of queues the reader will be reading from + # + if isinstance(self.qid,basestring) : + self.qid = [self.qid] + for qid in self.qid: + self.init(qid) + # r[qid] = [] + + if self.info.method.message_count > 0: + + self.channel.basic_consume(self.callback,queue=qid,no_ack=False); + self.channel.start_consuming() + else: + + pass + #self.close() + # r[qid].append( self.data) + + return self.data +class QueueListener(QueueReader): + def init(self,qid): + properties = pika.ConnectionParameters(host=self.host) + self.connection = pika.BlockingConnection(properties) + self.channel = self.connection.channel() + self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True ) + + self.info = self.channel.queue_declare(passive=True,exclusive=True,queue=qid) + + self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid) + #self.callback = callback + def read(self): + + self.init(self.qid) + self.channel.basic_consume(self.callback,queue=self.qid,no_ack=True); + self.channel.start_consuming() + \ No newline at end of file diff --git a/transport/queue.pyc b/transport/queue.pyc new file mode 100644 index 0000000000000000000000000000000000000000..281d8a0d5add3ab3ec7945040a42288132433942 GIT binary patch literal 6988 zcmcIo&vP6{74Df`?P}Ljtk|)!u(1c1A?r$F1p)=-vW;U0k`%FGk6mR@5~gNnTH0C7 z&a9_rZ7WDs6!w*ZBgKXPAV&(0R1P_F;>wACfIk4f@Ab^Ah0OuWmNcz5-97!|_4mH_ zUbp_W-2d&3Uwsy->ZgVOSMiv?qKNV5s8p#9E4wOn)WlI6j&iCtS8cd;zn0o))pA>H zv_)>IbU{rzYNMkjUA56|zO=p3X-UNk>akLfl=}I`qKZ38FDl(tn;5v;H1t%ATKsjS zq1QC@Roqv)uQnYOA8Kk3apkuM%h*NoZ~Uwe9Ejy2cNLbpOeT8ZvZjN=*I}#+5$_gB z32_OvH?%R~mcBL9Gu`Niqg=yd&ZE%K3}!`%r8=CO3UEch##$mQDF61L%}@S@qTlP? z870Px($E-hl;|QXqS20rs#u$3E7P%8=H4jG;#7M{hQ)+YndF)A9wg<+3%z?>k7smo zUl-oYB-xfXO}0aCvj`{pL0)WogM3zar8cGU!a{q)&|uI!^NMgdOv=f<^H{{iQEti} zg;^5!l2~VDvWeAr`KDKnw5K0NSi_cPk9*JTA?xV(B5H)1Z8hca;k4 zi)&vTX=&LP0glzpdPxm&O(hYCHOYU>Jm9;}R)2v7&&!^i=v1eha&FTQ+cyI=Dk;o& zAgzt^iC#0MzOUD|l5!0vAHG!XzQLfD;*~^9Lj^HB*k}N5MUPN$q@{O;T6 zb+!=#*x^`=W4ovF6V>b3ReRVenp#A)18v+E{H+TR%051f@+{MGvat#Q~>rn$1t$nrF+up+g;X2D!Y6K9qca0jw)TX?W$s} z+T6Hp_a{oIUNTWfyU1 zFip(J11?Lzk0cak-n%z$-Sh_j6U1=`%jXFk7-T%lZ{snXMztgUv+uHQP|ai5@|^;jK zrLv`V@hbH}TUy&{yg;i$&tO4n7StFg>WbXu8RK-}GJ)bps>NzI)rL)V<%PKp)zYn;&8m;QUMPkxdo|x1QwXDX6)p!N-x(onr54g#7VP~KZQ4*eQ;n^;()o+i-gG zUOwr?XIn|iuGbL68aOSqxDA=%PBGIW5Oh8*uB2JZI-1@MKisoExDbA@eTGRE%Z{+9 zQ6Bpz@wN3&)Prm)4Tn0FPo@QUyeJcWaIZx5&?mix|Au>1&N1f*eqVQ%oOAAJS4@Iv z=6=K^{s{pm1e6hUj@s_1;!N$9Q45ylDN7rmwMrDiYXOcx1N`%z1tUNQil;fX)gC~C z_7;x|eTymOJs@*I?R8alfeHXnQHc*&57|!qdl|KZQPB?#U4FpnyK0PpZ&73fc)&4` z-B-Klb4WppV}N)`?E>`6YL`~wK2mLmUoPQALdn~MSFqJ}Q9fgqLIQ^(f8f(76I!DY z|Dzw4YtuALGXDhYsK#(QO_NB-XKifq%>NoHg^G)ORsv+fw%!rM4D)!$f1Rz23@eTd z1mRS9tW(fdu|qGSCFr#9BA6_{`Q#bFMS_sclwlTWS*a-tnSt}mCgRy-YAl>NG@`d8 z3M#s^AX2C^zhzsR^5g@$)LiX--EydN9kB30$i30 z9a1TI5e>=_SO9x9EI7h?!cfNADUC=@!RnUhSX({_Nn7RdBda{3$NvF}7ov^LbY@1M zU^Mai3@AI!QD=Q1hjo-wk@aj@JK5DMT@uTtdWvg|BY^NypSOP%#7|jbiZ)ZaJH;j zRNGxu#2nMUqW)49CBQ+$c|+Iq!3YM}p-)HI^jt+X5<~N+r)t^7&bB0}(%Bk|Y%cTp?eE6u~$$SM2ZLY;UJ|3_@4D@EL(v)I4Fcdk7} zzC&L!(octf6vZRkH?8OEo%k$3%9|^*1rTrRX8M+l@ zna&C4NUH;fd)QfVUu$1+gige0i72j8Q1wR$U^hBg;0X*U%i|d$U4e5&*!PYcpsTiQ z2MHw~4y(%dKD#SHBt8K2BoeGGRqzIsZnsr&S-LOMd^tAk6%!(_L2P*l2y^8dG}|ce z-erZqyX?7K^X)&d&^n*3lT4dg@>b(b!Ng`#n9{2bWuKS9mz4)0`+nHutkpfl1sim2 z=@K_^US8mGkjW|VQY5%b@_;l1hq#}s#|}%>@lKJ$eGsAx;g`)OLwloE^)t;7u{XpW z-c(1tbi>OIM*LiR1Mwcrcz8#LgQ%b`@V1mT^d%m9Tk5VjXB}(%bJLUcyf|B1$83M*jPyY;{k}C}#+JT-1A_9evJlTq z3qnLsg+%#ef`pw%;j`Mt#3T{1V4p+V(I6byrCzeP!&f++xYQ*u6hdG89!CIt^!Vzv H)w}-zcc+mT literal 0 HcmV?d00001 diff --git a/transport/s3.py b/transport/s3.py new file mode 100644 index 0000000..9b117db --- /dev/null +++ b/transport/s3.py @@ -0,0 +1,83 @@ +from datetime import datetime +import boto +import botocore +from smart_open import smart_open +from common import Reader, Writer +import json +from common import Reader, Writer + +class s3 : + """ + @TODO: Implement a search function for a file given a bucket?? + """ + def __init__(self,args) : + """ + This function will extract a file or set of files from s3 bucket provided + @param access_key + @param secret_key + @param path location of the file + @param filter filename or filtering elements + """ + try: + self.s3 = boto.connect_s3(args['access_key'],args['secret_key']) + self.bucket = self.s3.get_bucket(args['bucket'].strip(),validate=False) if 'bucket' in args else None + # self.path = args['path'] + self.filter = args['filter'] if 'filter' in args else None + self.filename = args['file'] if 'file' in args else None + + except Exception as e : + self.s3 = None + self.bucket = None + print (e) + + def buckets(self): + # def buckets(self): + pass + # """ + # This function is a wrapper around the bucket list of buckets for s3 + # """ + # return self.s3.get_all_buckets() + + +class s3Reader(s3,Reader) : + """ + Because s3 contains buckets and files, reading becomes a tricky proposition : + - list files if file is None + - stream content if file is Not None + @TODO: support read from all buckets, think about it + """ + def __init__(self,args) : + s3.__init__(self,args) + def files(self): + r = [] + try: + return [item.name for item in self.bucket if item.size > 0] + except Exception as e: + pass + return r + def stream(self,limit=-1): + """ + At this point we should stream a file from a given bucket + """ + key = self.bucket.get_key(self.filename.strip()) + if key is None : + yield None + else: + count = 0 + with smart_open(key) as remote_file: + for line in remote_file: + if count == limit and limit > 0 : + break + yield line + count += 1 + def read(self,limit=-1) : + if self.filename is None : + # + # returning the list of files because no one file was specified. + return self.files() + else: + return self.stream(10) + +class s3Writer(s3,Writer) : + def __init__(self,args) : + s3.__init__(self,args) diff --git a/transport/s3.pyc b/transport/s3.pyc new file mode 100644 index 0000000000000000000000000000000000000000..baa8a005edfa678a42f55262243510209f56b0a8 GIT binary patch literal 3141 zcmcImOK%)S5U!rx9q()$9B>FR5ikjdEKwjqJ_JRGm?XRu1Uk|!8oFUyM-Jx|{)7=*YGz>)Gi=ekJ=&^+>{aIL_{e@3o zfwNt9Cb8OPgu&$TnoCGu>9m$sJla5F;=CRQ!X8{lnsb zR?7?2wmlm4sQ#7Iu7~uylVQ;GiXD$W`@B?iA1f<;&XM^Y1d$5vZD!^mDvz@~5A_pU zCn*O_!h=8=ZNq9^UZGr9MQ9eAaE0T#dXx?IkPllLCv{SUNt$Y7;*s7yv}kl%Yx}Ro zabh=vAg|Iyl7c(zrj~mS?SvITM-Z?zWm4!67P+;o+z7QxvB3s}f#~3Jc|EvRkmhnM zFnyHdnLw4n=+3ik#{#;?z{%@cWh*%;t;$lTHa3e9tI661JZg?)s2E#kV`P}<(%t`UhOt}*x(`A^%p5o*s5IlMwwKJ} zd*f|6HIB0~vvGVJa`Ou7cU$TWb;Wy2z3t7Z0Dmc;HGKrvY~hdq3SM1yw=f&cgFNR_ zcFH+#&Vn^uH01T!{l*~9Rvb&D;<%`Wlbm09UmOpsG>)TF;G@?-4(|F-FD2|8GQ%13 zTfPcT%0)Oqi5IR~r|@8ip_`*K}GH8ln)8NJraTCgaLv0`sdl@7*xZjCI?BAalhP zq81r1A_tf>iivMS3;U;vovJmR&P>LP2D#o9Xo5m+l5-J7m6f9~S*s>C%xu%&raew@ za1w4d&5gLtpsDc?n&N_WCH#^w>EVKqi1!_&?kH@^j`yMF@Zm7Xuw$Ia-FLuj5bY}3 zzRV2wPNJI+-XwH0v@Ud9wA)de>Unwf=o^n7&eO_jIgj4glHPXb8l@KS5Fs9_)#(W@-QB$A!X(LkSXcq!Eemtsg$)Io{yG3^%-Xi?3?sRYxlrXcd;HXuceD^-ciuiAqU=wf` zd~6vmCLC882IpfSezTh7!?1y4GZbV9B+ck_IKyU%cjj6NJM5y)ZgZo=JcwTA)t8u@ zV8W_WXM!dPW}X$9MN)~NFDq#^DXry(TUus>bX=WQUDZ*NBStwtg31eET#ir&oQs7fQ%KVa3lK@v39u%0!uysp9zYIG;;8r- zVl6t)0;wX#S#*wH@jL@qj9Ts{U*KCB9;vxj&+BtEqti?ln7qzJx@gE8-xKl42d4{n z6n3LC&|^0Zo9+APr(Q5KJJv#-%w%=2qqPcq9ELprCM9!WB@qcbB z5!=I2B_-4IP+T}9FA{@ypm!IK9OEmug|WtGQzSnFy5RU{Kw8yWAS*v5(BO-zj3lY- supn~FV2M*F!HxLz7qCJi{-GO?A8{mQexCqJXVnRH3iOmWe|F*g-`&?v=Kufz literal 0 HcmV?d00001 diff --git a/transport/session.py b/transport/session.py new file mode 100644 index 0000000..5ca833a --- /dev/null +++ b/transport/session.py @@ -0,0 +1,66 @@ +from flask import request, session +from datetime import datetime +import re +from common import Reader, Writer +import json + +class HttpRequestReader(Reader): + """ + This class is designed to read data from an Http request file handler provided to us by flask + The file will be heald in memory and processed accordingly + NOTE: This is inefficient and can crash a micro-instance (becareful) + """ + + def __init__(self,**params): + self.file_length = 0 + try: + + #self.file = params['file'] + #self.file.seek(0, os.SEEK_END) + #self.file_length = self.file.tell() + + #print 'size of file ',self.file_length + self.content = params['file'].readlines() + self.file_length = len(self.content) + except Exception, e: + print "Error ... ",e + pass + + def isready(self): + return self.file_length > 0 + def read(self,size =-1): + i = 1 + for row in self.content: + i += 1 + if size == i: + break + yield row + +class HttpSessionWriter(Writer): + """ + This class is designed to write data to a session/cookie + """ + def __init__(self,**params): + """ + @param key required session key + """ + self.session = params['queue'] + self.session['sql'] = [] + self.session['csv'] = [] + self.tablename = re.sub('..+$','',params['filename']) + self.session['uid'] = params['uid'] + #self.xchar = params['xchar'] + + + def format_sql(self,row): + values = "','".join([col.replace('"','').replace("'",'') for col in row]) + return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename) + def isready(self): + return True + def write(self,**params): + label = params['label'] + row = params ['row'] + + if label == 'usable': + self.session['csv'].append(self.format(row,',')) + self.session['sql'].append(self.format_sql(row))