You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

135 lines
4.6 KiB
Python

"""
This file handles state-space of the data training/generation process i.e Upon specification of the pre/post conditiions
"""
"""
This file handles state-space of the data training/generation process i.e Upon specification of the pre/post conditions,
The specifications for this are as follows (within an entry of the configuration)
{
"state":{
"pre":[{"approximate":{"field":"int"}},{"newdate":{"field":"format"}}],"post":[{"limit":10}]
}
}
"""
import importlib
import importlib.util
import sys
from datetime import datetime
from data.maker.state.default import *
import os
class State :
@staticmethod
def apply(_data,lpointers):
"""
This function applies a pipeline against a given data-frame, the calling code must decide whether it is a pre/post
:_data data-frame
:_lpointers functions modules returned by instance (module,_args)
"""
for _item in lpointers :
if _item is None :
continue
pointer = _item['module']
if type(pointer).__name__ != 'function':
_args = _item['args'] if 'args' in _item else {}
else:
pointer = _item['module']
_args = _item['args'] if 'args' in _item else {}
_data = pointer(_data,_args)
return _data
@staticmethod
def instance(_args):
"""
"""
pre = []
post=[]
out = {}
for key in _args :
#
# If the item has a path property is should be ignored
path = _args[key]['path'] if 'path' in _args[key] else ''
# out[key] = [ State._build(dict(_item,**{'path':path})) if 'path' not in _item else State._build(_item) for _item in _args[key]['pipeline']]
out[key] = []
for _item in _args[key]['pipeline'] :
if type(_item).__name__ == 'function':
_stageInfo = {'module':_item,'name':_item.__name__,'args':{},'path':''}
pass
else:
if 'path' in _item :
_stageInfo = State._build(dict(_item,**{'path':path}))
else :
_stageInfo= State._build(_item)
out[key].append(_stageInfo)
# print ([out])
return out
# if 'pre' in _args:
# path = _args['pre']['path'] if 'path' in _args['pre'] else ''
# pre = [ State._build(dict(_item,**{'path':path})) for _item in _args['pre']['pipeline']]
# else:
# path = _args['post']['path'] if 'path' in _args['post'] else ''
# post = [ State._build(dict(_item,**{'path':path})) for _item in _args['post']['pipeline']]
# return {'pre':pre,'post':post}
@staticmethod
def _extract(_entry):
_name = list(set(_entry.keys()) - set(['path']) )
_name = _name[0]
path = _entry['path'] if 'path' in _entry and os.path.exists(_entry['path']) else ''
return {"module": _name,"args": _entry[_name],'name':_name,'path':path}
pass
@staticmethod
def _build(_args):
"""
This function builds the object {module,path} where module is extracted from a file (if needed)
:param _args dictionary containing attributes that can be value pair
It can also be a function
"""
#
# In the advent an actual pointer is passed we should do the following
_info = State._extract(_args)
# _info = dict(_args,**_info)
_info['module'] = State._instance(_info)
return _info if _info['module'] is not None else None
@staticmethod
def _instance(_args):
"""
:path optional path of the file on disk
:module name of the function
"""
_name = _args['module']
if 'path' in _args and os.path.exists(_args['path']):
path= _args['path']
spec = importlib.util.spec_from_file_location(_name, path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
else:
#
# Probably calling a built-in module (should be in this file)
module = sys.modules['data.maker.state.default']
return getattr(module,_name) if hasattr(module,_name) else None
#
# Adding a few custom functions that should be able to help ....
# These functions can be called without specifying a path
#