@ -96,6 +96,7 @@ class Learner(Process):
#
#
# Below is a source of inefficiency, unfortunately python's type inference doesn't work well in certain cases
# Below is a source of inefficiency, unfortunately python's type inference doesn't work well in certain cases
# - The code below tries to address the issue (Perhaps better suited for the reading components)
# - The code below tries to address the issue (Perhaps better suited for the reading components)
_log = { }
for name in columns :
for name in columns :
_index = np . random . choice ( np . arange ( self . _df [ name ] . size ) , 5 , False )
_index = np . random . choice ( np . arange ( self . _df [ name ] . size ) , 5 , False )
no_value = [ type ( value ) in [ int , float , np . int64 , np . int32 , np . float32 , np . float64 ] for value in self . _df [ name ] . values [ _index ] ]
no_value = [ type ( value ) in [ int , float , np . int64 , np . int32 , np . float32 , np . float64 ] for value in self . _df [ name ] . values [ _index ] ]
@ -103,7 +104,9 @@ class Learner(Process):
self . _df [ name ] = self . _df [ name ] . fillna ( no_value )
self . _df [ name ] = self . _df [ name ] . fillna ( no_value )
_log [ name ] = self . _df [ name ] . dtypes . name
_log = { ' action ' : ' structure ' , ' input ' : _log }
self . log ( * * _log )
#
#
# convert the data to binary here ...
# convert the data to binary here ...
_schema = self . get_schema ( )
_schema = self . get_schema ( )
@ -293,46 +296,52 @@ class Generator (Learner):
name = _item [ ' name ' ]
name = _item [ ' name ' ]
if _item [ ' type ' ] . upper ( ) in [ ' DATE ' , ' DATETIME ' , ' TIMESTAMP ' ] :
if _item [ ' type ' ] . upper ( ) in [ ' DATE ' , ' DATETIME ' , ' TIMESTAMP ' ] :
FORMAT = ' % Y- % m- %d '
FORMAT = ' % m- %d - % Y '
try :
# try:
#
# #
#-- Sometimes data isn't all it's meant to be
# #-- Sometimes data isn't all it's meant to be
SIZE = - 1
# SIZE = -1
if ' format ' in self . info and name in self . info [ ' format ' ] :
# if 'format' in self.info and name in self.info['format'] :
FORMAT = self . info [ ' format ' ] [ name ]
# FORMAT = self.info['format'][name]
SIZE = 10
# SIZE = 10
elif _item [ ' type ' ] in [ ' DATETIME ' , ' TIMESTAMP ' ] :
# elif _item['type'] in ['DATETIME','TIMESTAMP'] :
FORMAT = ' % Y- % m- %d % H: % M: % S '
# FORMAT = '%m-%d-%Y %H:%M:%S'
SIZE = 19
# SIZE = 19
if SIZE > 0 :
# if SIZE > 0 :
values = pd . to_datetime ( _df [ name ] , format = FORMAT ) . astype ( str )
# values = pd.to_datetime(_df[name], format=FORMAT).astype(str)
_df [ name ] = [ _date [ : SIZE ] for _date in values ]
# _df[name] = [_date[:SIZE].strip() for _date in values]
r [ name ] = FORMAT
# # _df[name] = _df[name].astype(str)
# _df[name] = pd.to_datetime(_df[name], format=FORMAT) #.astype('datetime64[ns]')
# r[name] = FORMAT
if _item [ ' type ' ] in [ ' DATETIME ' , ' TIMESTAMP ' ] :
# # _df[name] = pd.to_datetime(_df[name], format=FORMAT) #.astype('datetime64[ns]')
pass #;_df[name] = _df[name].fillna('').astype('datetime64[ns]')
# if _item['type'] in ['DATETIME','TIMESTAMP']:
# pass #;_df[name] = _df[name].fillna('').astype('datetime64[ns]')
except Exception as e :
pass
# except Exception as e:
finally :
# pass
pass
# finally:
# pass
else :
else :
#
#
# Because types are inferred on the basis of the sample being processed they can sometimes be wrong
# Because types are inferred on the basis of the sample being processed they can sometimes be wrong
# To help disambiguate we add the schema information
# To help disambiguate we add the schema information
_type = None
_type = None
if ' int ' in _df [ name ] . dtypes . name or ' int ' in _item [ ' type ' ] . lower ( ) :
if ' int ' in _df [ name ] . dtypes . name or ' int ' in _item [ ' type ' ] . lower ( ) :
_type = np . int
_type = np . int
elif ' float ' in _df [ name ] . dtypes . name or ' float ' in _item [ ' type ' ] . lower ( ) :
elif ' float ' in _df [ name ] . dtypes . name or ' float ' in _item [ ' type ' ] . lower ( ) :
_type = np . float
_type = np . float
if _type :
if _type :
_df [ name ] = _df [ name ] . fillna ( 0 ) . replace ( ' ' , 0 ) . astype ( _type )
_df [ name ] = _df [ name ] . fillna ( 0 ) . replace ( ' ' , 0 ) . replace ( ' NA ' , 0 ) . replace ( ' nan ' , 0 ) . astype ( _type )
# else:
# _df[name] = _df[name].astype(str)
# _df = _df.replace('NaT','').replace('NA','')
# _df = _df.replace('NaT','').replace('NA','')
if r :
if r :
@ -373,10 +382,19 @@ class Generator (Learner):
_schema = self . get_schema ( )
_schema = self . get_schema ( )
_schema = [ { ' name ' : _item . name , ' type ' : _item . field_type } for _item in _schema ]
_schema = [ { ' name ' : _item . name , ' type ' : _item . field_type } for _item in _schema ]
_df = self . format ( _df , _schema )
_df = self . format ( _df , _schema )
_log = [ { " name " : _schema [ i ] [ ' name ' ] , " dataframe " : _df [ _df . columns [ i ] ] . dtypes . name , " schema " : _schema [ i ] [ ' type ' ] } for i in np . arange ( len ( _schema ) ) ]
self . log ( * * { " action " : " consolidate " , " input " : _log } )
# w = transport.factory.instance(doc='observation',provider='mongodb',context='write',db='IOV01_LOGS',auth_file='/home/steve/dev/transport/mongo.json')
# w.write(_df)
# print (_df[cols])
writer = transport . factory . instance ( * * _store )
writer = transport . factory . instance ( * * _store )
writer . write ( _df , schema = _schema )
writer . write ( _df , schema = _schema )
# _df.to_csv('foo.csv')
self . log ( * * { ' action ' : ' write ' , ' input ' : { ' rows ' : N , ' candidates ' : len ( _candidates ) } } )
self . log ( * * { ' action ' : ' write ' , ' input ' : { ' rows ' : N , ' candidates ' : len ( _candidates ) } } )
class Shuffle ( Generator ) :
class Shuffle ( Generator ) :