diff --git a/bin/transport b/bin/transport index 2225f3b..dd424a2 100755 --- a/bin/transport +++ b/bin/transport @@ -46,6 +46,7 @@ import time from multiprocessing import Process import typer import os +import transport from transport import etl from transport import providers @@ -88,7 +89,7 @@ def move (path,index=None): _config = _config[ int(index)] etl.instance(**_config) else: - etl.instance(_config) + etl.instance(config=_config) # # if type(_config) == dict : @@ -109,19 +110,30 @@ def move (path,index=None): # jobs.append(thread()) # if _config.index(_args) == 0 : # thread.join() - wait(jobs) - + # wait(jobs) +@app.command() +def version(): + print (transport.version.__version__) @app.command() def generate (path:str): - __doc__=""" - """ - _config = [{"source":{"provider":"http","url":"https://cdn.wsform.com/wp-content/uploads/2020/06/agreement.csv"},"target":{"provider":"file","path":"addresses.csv","delimiter":"csv"}}] + This function will generate a configuration template to give a sense of how to create one + """ + _config = [ + { + "source":{"provider":"http","url":"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv"}, + "target": + [{"provider":"file","path":"addresses.csv","delimiter":"csv"},{"provider":"sqlite","database":"sample.db3","table":"addresses"}] + } + ] file = open(path,'w') file.write(json.dumps(_config)) file.close() - -# if __name__ == '__main__' : +@app.command() +def usage(): + print (__doc__) +if __name__ == '__main__' : + app() # # # # Load information from the file ... # if 'help' in SYS_ARGS : diff --git a/transport/disk.py b/transport/disk.py index 8514e3f..a3880ec 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -62,34 +62,25 @@ class DiskWriter(Writer): """ THREAD_LOCK = Lock() def __init__(self,**params): - Writer.__init__(self) - self.cache['meta'] = {'cols':0,'rows':0,'delimiter':None} - if 'path' in params: - self.path = params['path'] - else: - self.path = 'data-transport.log' - self.delimiter = params['delimiter'] if 'delimiter' in params else None - # if 'name' in params: - # self.name = params['name']; - # else: - # self.name = 'data-transport.log' - # if os.path.exists(self.path) == False: - # os.mkdir(self.path) - def meta(self): - return self.cache['meta'] - def isready(self): - """ - This function determines if the class is ready for execution or not - i.e it determines if the preconditions of met prior execution - """ - return True - # p = self.path is not None and os.path.exists(self.path) - # q = self.name is not None - # return p and q - def format (self,row): - self.cache['meta']['cols'] += len(row) if isinstance(row,list) else len(row.keys()) - self.cache['meta']['rows'] += 1 - return (self.delimiter.join(row) if self.delimiter else json.dumps(row))+"\n" + super().__init__() + self._path = params['path'] + self._delimiter = params['delimiter'] + + # def meta(self): + # return self.cache['meta'] + # def isready(self): + # """ + # This function determines if the class is ready for execution or not + # i.e it determines if the preconditions of met prior execution + # """ + # return True + # # p = self.path is not None and os.path.exists(self.path) + # # q = self.name is not None + # # return p and q + # def format (self,row): + # self.cache['meta']['cols'] += len(row) if isinstance(row,list) else len(row.keys()) + # self.cache['meta']['rows'] += 1 + # return (self.delimiter.join(row) if self.delimiter else json.dumps(row))+"\n" def write(self,info,**_args): """ This function writes a record to a designated file @@ -97,21 +88,30 @@ class DiskWriter(Writer): @param row row to be written """ try: + _mode = 'a' if 'overwrite' not in _args else 'w' DiskWriter.THREAD_LOCK.acquire() - f = open(self.path,_mode) - if self.delimiter : - if type(info) == list : - for row in info : - f.write(self.format(row)) - else: - f.write(self.format(info)) - else: - if not type(info) == str : - f.write(json.dumps(info)+"\n") - else: - f.write(info) - f.close() + # # _path = _args['path'] if 'path' in _args else self.path + # # _delim= _args['delimiter'] if 'delimiter' in _args else self._delimiter + # # info.to_csv(_path,sep=_delim) + # info.to_csv(self.path) + # f = open(self.path,_mode) + # if self.delimiter : + # if type(info) == list : + # for row in info : + # f.write(self.format(row)) + # else: + # f.write(self.format(info)) + # else: + # if not type(info) == str : + # f.write(json.dumps(info)+"\n") + # else: + # f.write(info) + # f.close() + _delim = self._delimiter if 'delimiter' not in _args else _args['delimiter'] + _path = self.path if 'path' not in _args else _args['path'] + info.to_csv(_path,index=False,sep=_delim) + pass except Exception as e: # # Not sure what should be done here ... @@ -220,16 +220,19 @@ class SQLiteWriter(SQLite,DiskWriter) : # # If the table doesn't exist we should create it # - def write(self,info): + def write(self,info,**_args): """ """ if not self.fields : + if type(info) == pd.DataFrame : + _columns = list(info.columns) self.init(list(info.keys())) if type(info) == dict : info = [info] elif type(info) == pd.DataFrame : + info = info.fillna('') info = info.to_dict(orient='records') SQLiteWriter.LOCK.acquire() diff --git a/transport/etl.py b/transport/etl.py index dac58c4..aa4a73e 100644 --- a/transport/etl.py +++ b/transport/etl.py @@ -70,7 +70,7 @@ class Transporter(Process): # self._onerror = _args['onError'] self._source = _args['source'] self._target = _args['target'] - + # # Let's insure we can support multiple targets self._target = [self._target] if type(self._target) != list else self._target @@ -90,16 +90,18 @@ class Transporter(Process): This function will write a data-frame to a designated data-store, The function is built around a delegation design pattern :data data-frame or object to be written """ - for _target in self._target : - if 'write' not in _target : - _target['context'] = 'write' - _target['lock'] = True - else: - _target['write']['lock'] = True - _writer = transport.factory.instance(**_target) - _writer.write(_data,**_args) - if hasattr(_writer,'close') : - _writer.close() + if _data.shape[0] > 0 : + for _target in self._target : + if 'write' not in _target : + _target['context'] = 'write' + # _target['lock'] = True + else: + # _target['write']['lock'] = True + pass + _writer = transport.factory.instance(**_target) + _writer.write(_data.copy(),**_args) + if hasattr(_writer,'close') : + _writer.close() def write(self,_df,**_args): """ @@ -109,12 +111,12 @@ class Transporter(Process): # _df = self.read() _segments = np.array_split(np.range(_df.shape[0]),SEGMENT_COUNT) if _df.shape[0] > MAX_ROWS else np.array( [np.arange(_df.shape[0])]) # _index = 0 - + for _indexes in _segments : _fwd_args = {} if not _args else _args - self._delegate_write(_df.iloc[_indexes],**_fwd_args) + self._delegate_write(_df.iloc[_indexes]) # # @TODO: Perhaps consider writing up each segment in a thread/process (speeds things up?) pass