bug fixes, using boto3 instead of boto for s3 support

pull/21/head
Steve Nyemba 3 months ago
parent 3faee02fa2
commit 40f9c3930a

@ -1,6 +1,6 @@
__app_name__ = 'data-transport' __app_name__ = 'data-transport'
__author__ = 'The Phi Technology' __author__ = 'The Phi Technology'
__version__= '2.2.0' __version__= '2.2.1'
__email__ = "info@the-phi.com" __email__ = "info@the-phi.com"
__license__=f""" __license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba Copyright 2010 - 2024, Steve L. Nyemba

@ -19,7 +19,7 @@ args = {
"packages": find_packages(include=['info','transport', 'transport.*'])} "packages": find_packages(include=['info','transport', 'transport.*'])}
args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite'] args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite']
args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql'] args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql']
args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git"
args['scripts'] = ['bin/transport'] args['scripts'] = ['bin/transport']
# if sys.version_info[0] == 2 : # if sys.version_info[0] == 2 :

@ -134,7 +134,7 @@ class get :
""" """
@staticmethod @staticmethod
def reader (**_args): def reader (**_args):
if not _args or 'provider' not in _args: if not _args or ('provider' not in _args and 'label' not in _args):
_args['label'] = 'default' _args['label'] = 'default'
_args['context'] = 'read' _args['context'] = 'read'
return instance(**_args) return instance(**_args)
@ -143,7 +143,7 @@ class get :
""" """
This function is a wrapper that will return a writer to a database. It disambiguates the interface This function is a wrapper that will return a writer to a database. It disambiguates the interface
""" """
if not _args : if not _args or ('provider' not in _args and 'label' not in _args):
_args['label'] = 'default' _args['label'] = 'default'
_args['context'] = 'write' _args['context'] = 'write'
return instance(**_args) return instance(**_args)

@ -5,8 +5,8 @@ Steve L. Nyemba, The Phi Technology LLC
This file is a wrapper around s3 bucket provided by AWS for reading and writing content This file is a wrapper around s3 bucket provided by AWS for reading and writing content
""" """
from datetime import datetime from datetime import datetime
import boto import boto3
from boto.s3.connection import S3Connection, OrdinaryCallingFormat # from boto.s3.connection import S3Connection, OrdinaryCallingFormat
import numpy as np import numpy as np
import botocore import botocore
from smart_open import smart_open from smart_open import smart_open
@ -14,6 +14,7 @@ import sys
import json import json
from io import StringIO from io import StringIO
import pandas as pd
import json import json
class s3 : class s3 :
@ -29,46 +30,37 @@ class s3 :
@param filter filename or filtering elements @param filter filename or filtering elements
""" """
try: try:
self.s3 = S3Connection(args['access_key'],args['secret_key'],calling_format=OrdinaryCallingFormat()) self._client = boto3.client('s3',aws_access_key_id=args['access_key'],aws_secret_access_key=args['secret_key'],region_name=args['region'])
self.bucket = self.s3.get_bucket(args['bucket'].strip(),validate=False) if 'bucket' in args else None self._bucket_name = args['bucket']
# self.path = args['path'] self._file_name = args['file']
self.filter = args['filter'] if 'filter' in args else None self._region = args['region']
self.filename = args['file'] if 'file' in args else None
self.bucket_name = args['bucket'] if 'bucket' in args else None
except Exception as e : except Exception as e :
self.s3 = None
self.bucket = None
print (e) print (e)
pass
def has(self,**_args):
_found = None
try:
if 'file' in _args and 'bucket' in _args:
_found = self.meta(**_args)
elif 'bucket' in _args and not 'file' in _args:
_found = self._client.list_objects(Bucket=_args['bucket'])
elif 'file' in _args and not 'bucket' in _args :
_found = self.meta(bucket=self._bucket_name,file = _args['file'])
except Exception as e:
_found = None
pass
return type(_found) == dict
def meta(self,**args): def meta(self,**args):
""" """
This function will return information either about the file in a given bucket
:name name of the bucket :name name of the bucket
""" """
info = self.list(**args) _bucket = self._bucket_name if 'bucket' not in args else args['bucket']
[item.open() for item in info] _file = self._file_name if 'file' not in args else args['file']
return [{"name":item.name,"size":item.size} for item in info] _data = self._client.get_object(Bucket=_bucket,Key=_file)
def list(self,**args): return _data['ResponseMetadata']
""" def close(self):
This function will list the content of a bucket, the bucket must be provided by the name self._client.close()
:name name of the bucket
"""
return list(self.s3.get_bucket(args['name']).list())
def buckets(self):
#
# This function will return all buckets, not sure why but it should be used cautiously
# based on why the s3 infrastructure is used
#
return [item.name for item in self.s3.get_all_buckets()]
# def buckets(self):
pass
# """
# This function is a wrapper around the bucket list of buckets for s3
# """
# return self.s3.get_all_buckets()
class Reader(s3) : class Reader(s3) :
""" """
@ -77,51 +69,63 @@ class Reader(s3) :
- stream content if file is Not None - stream content if file is Not None
@TODO: support read from all buckets, think about it @TODO: support read from all buckets, think about it
""" """
def __init__(self,**args) : def __init__(self,**_args) :
s3.__init__(self,**args) super().__init__(**_args)
def files(self):
r = [] def _stream(self,**_args):
try:
return [item.name for item in self.bucket if item.size > 0]
except Exception as e:
pass
return r
def stream(self,limit=-1):
""" """
At this point we should stream a file from a given bucket At this point we should stream a file from a given bucket
""" """
key = self.bucket.get_key(self.filename.strip()) _object = self._client.get_object(Bucket=_args['bucket'],Key=_args['file'])
if key is None : _stream = None
yield None try:
_stream = _object['Body'].read()
except Exception as e:
pass
if not _stream :
return None
if _object['ContentType'] in ['text/csv'] :
return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")))
else: else:
count = 0 return _stream
with smart_open(key) as remote_file:
for line in remote_file:
if count == limit and limit > 0 :
break
yield line
count += 1
def read(self,**args) : def read(self,**args) :
if self.filename is None :
# _name = self._file_name if 'file' not in args else args['file']
# returning the list of files because no one file was specified. _bucket = args['bucket'] if 'bucket' in args else self._bucket_name
return self.files() return self._stream(bucket=_bucket,file=_name)
else:
limit = args['size'] if 'size' in args else -1
return self.stream(limit)
class Writer(s3) : class Writer(s3) :
"""
def __init__(self,**args) :
s3.__init__(self,**args) """
def mkdir(self,name): def __init__(self,**_args) :
super().__init__(**_args)
#
#
if not self.has(bucket=self._bucket_name) :
self.make_bucket(self._bucket_name)
def make_bucket(self,bucket_name):
""" """
This function will create a folder in a bucket This function will create a folder in a bucket,It is best that the bucket is organized as a namespace
:name name of the folder :name name of the folder
""" """
self.s3.put_object(Bucket=self.bucket_name,key=(name+'/'))
def write(self,content): self._client.create_bucket(Bucket=bucket_name,CreateBucketConfiguration={'LocationConstraint': self._region})
file = StringIO(content.decode("utf8")) def write(self,_data,**_args):
self.s3.upload_fileobj(file,self.bucket_name,self.filename) """
This function will write the data to the s3 bucket, files can be either csv, or json formatted files
"""
if type(_data) == pd.DataFrame :
_stream = _data.to_csv(index=False)
elif type(_data) == dict :
_stream = json.dumps(_data)
else:
_stream = _data
file = StringIO(_stream)
bucket = self._bucket_name if 'bucket' not in _args else _args['bucket']
file_name = self._file_name if 'file' not in _args else _args['file']
self._client.put_object(Bucket=bucket, Key = file_name, Body=_stream)
pass pass

Loading…
Cancel
Save