From e4faac7d6e3a62b5d73c6f109942bac3a81dbad9 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Fri, 2 Feb 2024 18:11:20 -0600 Subject: [PATCH] bug fixes: sqlite writer with sqlalchemy --- info/__init__.py | 2 +- transport/__init__.py | 2 +- transport/disk.py | 19 +++++++++++++++++-- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/info/__init__.py b/info/__init__.py index d5e703a..1ff6503 100644 --- a/info/__init__.py +++ b/info/__init__.py @@ -1,5 +1,5 @@ __author__ = 'The Phi Technology' -__version__= '1.9.6' +__version__= '1.9.8' __license__=""" diff --git a/transport/__init__.py b/transport/__init__.py index 7b37a12..c820978 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -50,7 +50,7 @@ else: import s3 import sql import etl - from version import __version__ + from info import __version__,__author__ import providers # import psycopg2 as pg # import mysql.connector as my diff --git a/transport/disk.py b/transport/disk.py index 424e95e..2f39867 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -13,7 +13,8 @@ import sqlite3 import pandas as pd from multiprocessing import Lock from transport.common import Reader, Writer, IEncoder - +import sqlalchemy +from sqlalchemy import create_engine class DiskReader(Reader) : """ This class is designed to read data from disk (location on hard drive) @@ -22,6 +23,7 @@ class DiskReader(Reader) : def __init__(self,**params): """ + @param path absolute path of the file to be read """ @@ -111,6 +113,8 @@ class SQLite : self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE") self.conn.row_factory = sqlite3.Row self.fields = _args['fields'] if 'fields' in _args else [] + path = self._path + self._engine = create_engine(f'sqlite://{path}') def has (self,**_args): found = False try: @@ -207,7 +211,18 @@ class SQLiteWriter(SQLite,DiskWriter) : # # If the table doesn't exist we should create it # - def write(self,info,**_args): + def write(self,_data,**_args): + SQLiteWriter.LOCK.acquire() + try: + if type(_data) == dict : + _data = [_data] + _table = self.table if 'table' not in _args else _args['table'] + _df = pd.DataFrame(_data) + _df.to_sql(_table,self._engine.connect()) + except Exception as e: + print (e) + SQLiteWriter.LOCK.release() + def _write(self,info,**_args): """ """