v2.2.0 bug fix - AWS-S3 #21

Merged
steve merged 3 commits from v2.2.0 into master 5 months ago

@ -1,6 +1,6 @@
__app_name__ = 'data-transport' __app_name__ = 'data-transport'
__author__ = 'The Phi Technology' __author__ = 'The Phi Technology'
__version__= '2.2.0' __version__= '2.2.1'
__email__ = "info@the-phi.com" __email__ = "info@the-phi.com"
__license__=f""" __license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba Copyright 2010 - 2024, Steve L. Nyemba

@ -19,7 +19,7 @@ args = {
"packages": find_packages(include=['info','transport', 'transport.*'])} "packages": find_packages(include=['info','transport', 'transport.*'])}
args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite'] args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite']
args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql'] args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql']
args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git"
args['scripts'] = ['bin/transport'] args['scripts'] = ['bin/transport']
# if sys.version_info[0] == 2 : # if sys.version_info[0] == 2 :

@ -134,7 +134,7 @@ class get :
""" """
@staticmethod @staticmethod
def reader (**_args): def reader (**_args):
if not _args or 'provider' not in _args: if not _args or ('provider' not in _args and 'label' not in _args):
_args['label'] = 'default' _args['label'] = 'default'
_args['context'] = 'read' _args['context'] = 'read'
return instance(**_args) return instance(**_args)
@ -143,7 +143,7 @@ class get :
""" """
This function is a wrapper that will return a writer to a database. It disambiguates the interface This function is a wrapper that will return a writer to a database. It disambiguates the interface
""" """
if not _args : if not _args or ('provider' not in _args and 'label' not in _args):
_args['label'] = 'default' _args['label'] = 'default'
_args['context'] = 'write' _args['context'] = 'write'
return instance(**_args) return instance(**_args)

@ -3,10 +3,13 @@ Data Transport - 1.0
Steve L. Nyemba, The Phi Technology LLC Steve L. Nyemba, The Phi Technology LLC
This file is a wrapper around s3 bucket provided by AWS for reading and writing content This file is a wrapper around s3 bucket provided by AWS for reading and writing content
TODO:
- Address limitations that will properly read csv if it is stored with content type text/csv
""" """
from datetime import datetime from datetime import datetime
import boto import boto3
from boto.s3.connection import S3Connection, OrdinaryCallingFormat # from boto.s3.connection import S3Connection, OrdinaryCallingFormat
import numpy as np import numpy as np
import botocore import botocore
from smart_open import smart_open from smart_open import smart_open
@ -14,6 +17,7 @@ import sys
import json import json
from io import StringIO from io import StringIO
import pandas as pd
import json import json
class s3 : class s3 :
@ -29,46 +33,37 @@ class s3 :
@param filter filename or filtering elements @param filter filename or filtering elements
""" """
try: try:
self.s3 = S3Connection(args['access_key'],args['secret_key'],calling_format=OrdinaryCallingFormat()) self._client = boto3.client('s3',aws_access_key_id=args['access_key'],aws_secret_access_key=args['secret_key'],region_name=args['region'])
self.bucket = self.s3.get_bucket(args['bucket'].strip(),validate=False) if 'bucket' in args else None self._bucket_name = args['bucket']
# self.path = args['path'] self._file_name = args['file']
self.filter = args['filter'] if 'filter' in args else None self._region = args['region']
self.filename = args['file'] if 'file' in args else None
self.bucket_name = args['bucket'] if 'bucket' in args else None
except Exception as e : except Exception as e :
self.s3 = None
self.bucket = None
print (e) print (e)
pass
def has(self,**_args):
_found = None
try:
if 'file' in _args and 'bucket' in _args:
_found = self.meta(**_args)
elif 'bucket' in _args and not 'file' in _args:
_found = self._client.list_objects(Bucket=_args['bucket'])
elif 'file' in _args and not 'bucket' in _args :
_found = self.meta(bucket=self._bucket_name,file = _args['file'])
except Exception as e:
_found = None
pass
return type(_found) == dict
def meta(self,**args): def meta(self,**args):
""" """
This function will return information either about the file in a given bucket
:name name of the bucket :name name of the bucket
""" """
info = self.list(**args) _bucket = self._bucket_name if 'bucket' not in args else args['bucket']
[item.open() for item in info] _file = self._file_name if 'file' not in args else args['file']
return [{"name":item.name,"size":item.size} for item in info] _data = self._client.get_object(Bucket=_bucket,Key=_file)
def list(self,**args): return _data['ResponseMetadata']
""" def close(self):
This function will list the content of a bucket, the bucket must be provided by the name self._client.close()
:name name of the bucket
"""
return list(self.s3.get_bucket(args['name']).list())
def buckets(self):
#
# This function will return all buckets, not sure why but it should be used cautiously
# based on why the s3 infrastructure is used
#
return [item.name for item in self.s3.get_all_buckets()]
# def buckets(self):
pass
# """
# This function is a wrapper around the bucket list of buckets for s3
# """
# return self.s3.get_all_buckets()
class Reader(s3) : class Reader(s3) :
""" """
@ -77,51 +72,66 @@ class Reader(s3) :
- stream content if file is Not None - stream content if file is Not None
@TODO: support read from all buckets, think about it @TODO: support read from all buckets, think about it
""" """
def __init__(self,**args) : def __init__(self,**_args) :
s3.__init__(self,**args) super().__init__(**_args)
def files(self):
r = [] def _stream(self,**_args):
try:
return [item.name for item in self.bucket if item.size > 0]
except Exception as e:
pass
return r
def stream(self,limit=-1):
""" """
At this point we should stream a file from a given bucket At this point we should stream a file from a given bucket
""" """
key = self.bucket.get_key(self.filename.strip()) _object = self._client.get_object(Bucket=_args['bucket'],Key=_args['file'])
if key is None : _stream = None
yield None try:
_stream = _object['Body'].read()
except Exception as e:
pass
if not _stream :
return None
if _object['ContentType'] in ['text/csv'] :
return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")))
else: else:
count = 0 return _stream
with smart_open(key) as remote_file:
for line in remote_file:
if count == limit and limit > 0 :
break
yield line
count += 1
def read(self,**args) : def read(self,**args) :
if self.filename is None :
# _name = self._file_name if 'file' not in args else args['file']
# returning the list of files because no one file was specified. _bucket = args['bucket'] if 'bucket' in args else self._bucket_name
return self.files() return self._stream(bucket=_bucket,file=_name)
else:
limit = args['size'] if 'size' in args else -1
return self.stream(limit)
class Writer(s3) : class Writer(s3) :
"""
def __init__(self,**args) :
s3.__init__(self,**args)
def mkdir(self,name):
""" """
This function will create a folder in a bucket def __init__(self,**_args) :
super().__init__(**_args)
#
#
if not self.has(bucket=self._bucket_name) :
self.make_bucket(self._bucket_name)
def make_bucket(self,bucket_name):
"""
This function will create a folder in a bucket,It is best that the bucket is organized as a namespace
:name name of the folder :name name of the folder
""" """
self.s3.put_object(Bucket=self.bucket_name,key=(name+'/'))
def write(self,content): self._client.create_bucket(Bucket=bucket_name,CreateBucketConfiguration={'LocationConstraint': self._region})
file = StringIO(content.decode("utf8")) def write(self,_data,**_args):
self.s3.upload_fileobj(file,self.bucket_name,self.filename) """
This function will write the data to the s3 bucket, files can be either csv, or json formatted files
"""
content = 'text/plain'
if type(_data) == pd.DataFrame :
_stream = _data.to_csv(index=False)
content = 'text/csv'
elif type(_data) == dict :
_stream = json.dumps(_data)
content = 'application/json'
else:
_stream = _data
file = StringIO(_stream)
bucket = self._bucket_name if 'bucket' not in _args else _args['bucket']
file_name = self._file_name if 'file' not in _args else _args['file']
self._client.put_object(Bucket=bucket, Key = file_name, Body=_stream,ContentType=content)
pass pass

Loading…
Cancel
Save