bug fixes: sqlite writer with sqlalchemy

pull/10/head
Steve Nyemba 9 months ago
parent 35b261edbf
commit e4faac7d6e

@ -1,5 +1,5 @@
__author__ = 'The Phi Technology' __author__ = 'The Phi Technology'
__version__= '1.9.6' __version__= '1.9.8'
__license__=""" __license__="""

@ -50,7 +50,7 @@ else:
import s3 import s3
import sql import sql
import etl import etl
from version import __version__ from info import __version__,__author__
import providers import providers
# import psycopg2 as pg # import psycopg2 as pg
# import mysql.connector as my # import mysql.connector as my

@ -13,7 +13,8 @@ import sqlite3
import pandas as pd import pandas as pd
from multiprocessing import Lock from multiprocessing import Lock
from transport.common import Reader, Writer, IEncoder from transport.common import Reader, Writer, IEncoder
import sqlalchemy
from sqlalchemy import create_engine
class DiskReader(Reader) : class DiskReader(Reader) :
""" """
This class is designed to read data from disk (location on hard drive) This class is designed to read data from disk (location on hard drive)
@ -22,6 +23,7 @@ class DiskReader(Reader) :
def __init__(self,**params): def __init__(self,**params):
""" """
@param path absolute path of the file to be read @param path absolute path of the file to be read
""" """
@ -111,6 +113,8 @@ class SQLite :
self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE") self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE")
self.conn.row_factory = sqlite3.Row self.conn.row_factory = sqlite3.Row
self.fields = _args['fields'] if 'fields' in _args else [] self.fields = _args['fields'] if 'fields' in _args else []
path = self._path
self._engine = create_engine(f'sqlite://{path}')
def has (self,**_args): def has (self,**_args):
found = False found = False
try: try:
@ -207,7 +211,18 @@ class SQLiteWriter(SQLite,DiskWriter) :
# #
# If the table doesn't exist we should create it # If the table doesn't exist we should create it
# #
def write(self,info,**_args): def write(self,_data,**_args):
SQLiteWriter.LOCK.acquire()
try:
if type(_data) == dict :
_data = [_data]
_table = self.table if 'table' not in _args else _args['table']
_df = pd.DataFrame(_data)
_df.to_sql(_table,self._engine.connect())
except Exception as e:
print (e)
SQLiteWriter.LOCK.release()
def _write(self,info,**_args):
""" """
""" """

Loading…
Cancel
Save