"""Amazon Simple Storage Service (S3) Driver."""
import logging
from typing import Dict, Iterable, List, Union
from urllib.parse import quote, urljoin
import boto3
from botocore.exceptions import ClientError, WaiterError, ParamValidationError
from inflection import camelize, underscore
from cloudstorage.base import (Blob, Container, ContentLength, Driver,
ExtraOptions, FileLike, FormPost, MetaData)
from cloudstorage.exceptions import (CloudStorageError, IsNotEmptyError,
NotFoundError)
from cloudstorage.helpers import file_content_type, validate_file_or_path
from cloudstorage.messages import *
logger = logging.getLogger(__name__)
[docs]class S3Driver(Driver):
"""Driver for interacting with Amazon Simple Storage Service (S3).
.. code-block:: python
from cloudstorage.drivers.amazon import S3Driver
storage = S3Driver(key='<my-aws-access-key-id>',
secret='<my-aws-secret-access-key>',
region='us-east-1')
# <Driver: S3 us-east-1>
References:
* `Boto 3 Docs <http://boto3.readthedocs.io>`_
* `Amazon S3 REST API Introduction
<https://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html>`_
:param key: AWS Access Key ID.
:type key: str
:param secret: AWS Secret Access Key.
:type secret: str
:param region: (optional) Region to connect to. Defaults to `us-east-1`.
:type region: str
"""
name = 'S3'
hash_type = 'md5'
url = 'https://aws.amazon.com/s3/'
def __init__(self, key, secret=None, region='us-east-1') -> None:
region = region.lower()
super().__init__(key=key, secret=secret, region=region)
self._session = boto3.Session(aws_access_key_id=key,
aws_secret_access_key=secret,
region_name=region)
# session required for loading regions list
if region not in self.regions:
raise CloudStorageError(region_not_found % region)
[docs] def __iter__(self) -> Iterable[Container]:
for bucket in self.s3.buckets.all():
yield self._make_container(bucket)
[docs] def __len__(self) -> int:
buckets = [bucket for bucket in self.s3.buckets.all()]
return len(buckets)
@staticmethod
def _normalize_parameters(params: Dict[str, str],
normalizers: Dict[str, str]) -> Dict[str, str]:
normalized = params.copy()
for key, value in params.items():
normalized.pop(key)
if not value:
continue
key_inflected = camelize(underscore(key),
uppercase_first_letter=True)
# Only include parameters found in normalizers
key_overrider = normalizers.get(key_inflected.lower())
if key_overrider:
normalized[key_overrider] = value
return normalized
def _get_bucket(self, bucket_name: str, validate: bool = True):
"""Get a S3 bucket.
:param bucket_name: The Bucket's name identifier.
:type bucket_name: str
:param validate: If True, verify that the bucket exists.
:type validate: bool
:return: S3 bucket resource object.
:rtype: :class:`boto3.s3.Bucket`
:raises NotFoundError: If the bucket does not exist.
:raises CloudStorageError: Boto 3 client error.
"""
bucket = self.s3.Bucket(bucket_name)
if validate:
try:
response = self.s3.meta.client.head_bucket(Bucket=bucket_name)
logger.debug('response=%s' % response)
except ClientError as e:
error_code = int(e.response['Error']['Code'])
if error_code == 404:
raise NotFoundError(container_not_found % bucket_name)
raise CloudStorageError('%s: %s' % (
e.response['Error']['Code'],
e.response['Error']['Message']))
try:
bucket.wait_until_exists()
except WaiterError as e:
# TODO: QUESTION: Raise NotFoundError exception?
logging.error(e, exc_info=True)
return bucket
def _make_blob(self, container: Container, object_summary) -> Blob:
"""Convert S3 Object Summary to Blob instance.
:param container: The container that holds the blob.
:type container: :class:`.Container`
:param object_summary: S3 object summary.
:type object_summary: :class:`boto3.s3.ObjectSummary`
:return: A blob object.
:rtype: :class:`.Blob`
:raise NotFoundError: If the blob object doesn't exist.
"""
try:
name = object_summary.key
#: etag wrapped in quotes
checksum = etag = object_summary.e_tag.replace('"', '')
size = object_summary.size
acl = object_summary.Acl()
meta_data = object_summary.meta.data.get('Metadata', {})
content_disposition = object_summary.meta.data.get(
'ContentDisposition', None)
content_type = object_summary.meta.data.get('ContentType', None)
modified_at = object_summary.last_modified
created_at = None
expires_at = None # TODO: FEATURE: Delete at / expires at
except ClientError as e:
error_code = int(e.response['Error']['Code'])
if error_code == 404:
raise NotFoundError(blob_not_found % (container.name,
object_summary.key))
raise CloudStorageError('%s: %s' % (
e.response['Error']['Code'],
e.response['Error']['Message']))
return Blob(name=name, checksum=checksum, etag=etag, size=size,
container=container, driver=self, acl=acl,
meta_data=meta_data,
content_disposition=content_disposition,
content_type=content_type, created_at=created_at,
modified_at=modified_at, expires_at=expires_at)
def _make_container(self, bucket) -> Container:
"""Convert S3 Bucket to Container.
:param bucket: S3 bucket object.
:type bucket: :class:`boto3.s3.Bucket`
:return: The container if it exists.
:rtype: :class:`.Container`
"""
acl = bucket.Acl()
created_at = bucket.creation_date.astimezone(tz=None)
return Container(name=bucket.name, driver=self, acl=acl,
meta_data=None, created_at=created_at)
@property
def session(self) -> boto3.session.Session:
"""Amazon Web Services session.
:return: AWS session.
:rtype: :class:`boto3.session.Session`
"""
return self._session
# noinspection PyUnresolvedReferences
@property
def s3(self) -> boto3.resources.base.ServiceResource:
"""S3 service resource.
:return: The s3 resource instance.
:rtype: :class:`boto3.resources.base.ServiceResource`
"""
return self.session.resource(service_name='s3', region_name=self.region)
@property
def regions(self) -> List[str]:
return self.session.get_available_regions('s3')
[docs] def create_container(self, container_name: str, acl: str = None,
meta_data: MetaData = None) -> Container:
if meta_data:
logger.info(option_not_supported % 'meta_data')
# Required parameters
params = {
'Bucket': container_name,
}
if acl:
params['ACL'] = acl.lower()
# TODO: BUG: Creating S3 bucket in us-east-1
# See https://github.com/boto/boto3/issues/125
if self.region != 'us-east-1':
params['CreateBucketConfiguration'] = {
'LocationConstraint': self.region,
}
logger.debug('params=%s' % params)
try:
bucket = self.s3.create_bucket(**params)
except ParamValidationError as e:
msg = e.kwargs.get('report', container_name_invalid)
raise CloudStorageError(msg)
bucket.wait_until_exists()
return self._make_container(bucket)
[docs] def get_container(self, container_name: str) -> Container:
bucket = self._get_bucket(container_name)
return self._make_container(bucket)
[docs] def patch_container(self, container: Container) -> None:
raise NotImplementedError
[docs] def delete_container(self, container: Container) -> None:
bucket = self._get_bucket(container.name, validate=False)
try:
bucket.delete()
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'BucketNotEmpty':
raise IsNotEmptyError(container_not_empty % bucket.name)
raise
[docs] def container_cdn_url(self, container: Container) -> str:
bucket = self._get_bucket(container.name, validate=False)
endpoint_url = bucket.meta.client.meta.endpoint_url
return '%s/%s' % (endpoint_url, container.name)
[docs] def enable_container_cdn(self, container: Container) -> bool:
logger.warning(feature_not_supported % 'enable_container_cdn')
return False
[docs] def disable_container_cdn(self, container: Container) -> bool:
logger.warning(feature_not_supported % 'disable_container_cdn')
return False
[docs] def upload_blob(self, container: Container, filename: Union[str, FileLike],
blob_name: str = None, acl: str = None,
meta_data: MetaData = None, content_type: str = None,
content_disposition: str = None,
extra: ExtraOptions = None) -> Blob:
meta_data = {} if meta_data is None else meta_data
extra = {} if extra is None else extra
extra_args = self._normalize_parameters(extra, self._PUT_OBJECT_KEYS)
# Default arguments
if acl:
extra_args.setdefault('ACL', acl.lower())
extra_args.setdefault('Metadata', meta_data)
extra_args.setdefault('StorageClass', 'STANDARD')
if content_disposition:
extra_args['ContentDisposition'] = content_disposition
blob_name = blob_name or validate_file_or_path(filename)
# Boto uses application/octet-stream by default
if not content_type:
if isinstance(filename, str):
# TODO: QUESTION: Any advantages between filename vs blob_name?
extra_args['ContentType'] = file_content_type(filename)
else:
extra_args['ContentType'] = file_content_type(blob_name)
else:
extra_args['ContentType'] = content_type
logger.debug('extra_args=%s' % extra_args)
if isinstance(filename, str):
self.s3.Bucket(container.name).upload_file(Filename=filename,
Key=blob_name,
ExtraArgs=extra_args)
else:
self.s3.Bucket(container.name).upload_fileobj(Fileobj=filename,
Key=blob_name,
ExtraArgs=extra_args)
return self.get_blob(container, blob_name)
[docs] def get_blob(self, container: Container, blob_name: str) -> Blob:
object_summary = self.s3.ObjectSummary(bucket_name=container.name,
key=blob_name)
return self._make_blob(container, object_summary)
[docs] def get_blobs(self, container: Container) -> Iterable[Blob]:
bucket = self._get_bucket(container.name, validate=False)
for key in bucket.objects.all(): # s3.ObjectSummary
yield self._make_blob(container, key)
[docs] def download_blob(self, blob: Blob,
destination: Union[str, FileLike]) -> None:
if isinstance(destination, str):
self.s3.Bucket(name=blob.container.name).download_file(
Key=blob.name, Filename=destination, ExtraArgs={})
else:
self.s3.Bucket(name=blob.container.name).download_fileobj(
Key=blob.name, Fileobj=destination, ExtraArgs={})
[docs] def patch_blob(self, blob: Blob) -> None:
raise NotImplementedError
[docs] def delete_blob(self, blob: Blob) -> None:
# Required parameters
params = {
'Bucket': blob.container.name,
'Key': blob.name,
}
logger.debug('params=%s' % params)
try:
response = self.s3.meta.client.delete_object(**params)
logger.debug('response=%s' % response)
except ClientError as e:
error_code = int(e.response['Error']['Code'])
if error_code != 200 or error_code != 204:
raise NotFoundError(blob_not_found % (blob.name,
blob.container.name))
raise
[docs] def blob_cdn_url(self, blob: Blob) -> str:
container_url = self.container_cdn_url(blob.container)
blob_name_cleaned = quote(blob.name)
blob_path = '%s/%s' % (container_url, blob_name_cleaned)
url = urljoin(container_url, blob_path)
return url
[docs] def generate_container_upload_url(self, container: Container,
blob_name: str,
expires: int = 3600, acl: str = None,
meta_data: MetaData = None,
content_disposition: str = None,
content_length: ContentLength = None,
content_type: str = None,
extra: ExtraOptions = None) -> FormPost:
meta_data = {} if meta_data is None else meta_data
extra = {} if extra is None else extra
extra_norm = self._normalize_parameters(extra, self._POST_OBJECT_KEYS)
conditions = []
fields = {}
if acl:
conditions.append({'acl': acl})
fields['acl'] = acl
headers = {
'Content-Disposition': content_disposition,
'Content-Type': content_type,
}
for header_name, header_value in headers.items():
if not header_value:
continue
fields[header_name.lower()] = header_value
conditions.append(['eq', '$' + header_name, header_value])
# Add content-length-range which is a tuple
if content_length:
min_range, max_range = content_length
conditions.append(['content-length-range', min_range, max_range])
for meta_name, meta_value in meta_data.items():
meta_name = self._OBJECT_META_PREFIX + meta_name
fields[meta_name] = meta_value
conditions.append({meta_name: meta_value})
# Add extra conditions and fields
for extra_name, extra_value in extra_norm.items():
fields[extra_name] = extra_value
conditions.append({extra_name: extra_value})
return self.s3.meta.client.generate_presigned_post(
Bucket=container.name,
Key=blob_name,
Fields=fields,
Conditions=conditions,
ExpiresIn=int(expires))
[docs] def generate_blob_download_url(self, blob: Blob, expires: int = 3600,
method: str = 'GET',
content_disposition: str = None,
extra: ExtraOptions = None) -> str:
extra = extra if extra is not None else {}
params = self._normalize_parameters(extra, self._GET_OBJECT_KEYS)
# Required parameters
params['Bucket'] = blob.container.name
params['Key'] = blob.name
# Optional
if content_disposition:
params['ResponseContentDisposition'] = content_disposition
logger.debug('params=%s' % params)
return self.s3.meta.client.generate_presigned_url(
ClientMethod='get_object', Params=params, ExpiresIn=int(expires),
HttpMethod=method.lower())
_OBJECT_META_PREFIX = 'x-amz-meta-'
#: `S3.Client.generate_presigned_post
#: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html
#: #S3.Client.generate_presigned_post>`_
_POST_OBJECT_KEYS = {
'acl': 'acl',
'cachecontrol': 'Cache-Control',
'contenttype': 'Content-Type',
'contentdisposition': 'Content-Disposition',
'contentencoding': 'Content-Encoding',
'expires': 'Expires',
'successactionredirect': 'success_action_redirect',
'redirect': 'redirect',
'successactionstatus': 'success_action_status',
'xamzmeta': 'x-amz-meta-',
}
#: `#S3.Client.get_object
#: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html
#: #S3.Client.get_object>`_
_GET_OBJECT_KEYS = {
'bucket': 'Bucket',
'ifmatch': 'IfMatch',
'ifmodifiedsince': 'IfModifiedSince',
'ifnonematch': 'IfNoneMatch',
'ifunmodifiedsince': 'IfUnmodifiedSince',
'key': 'Key',
'range': 'Range',
'responsecachecontrol': 'ResponseCacheControl',
'responsecontentdisposition': 'ResponseContentDisposition',
'responsecontentencoding': 'ResponseContentEncoding',
'responsecontentlanguage': 'ResponseContentLanguage',
'responsecontenttype': 'ResponseContentType',
'responseexpires': 'ResponseExpires',
'versionid': 'VersionId',
'ssecustomeralgorithm': 'SSECustomerAlgorithm',
'ssecustomerkey': 'SSECustomerKey',
'requestpayer': 'RequestPayer',
'partnumber': 'PartNumber',
# Extra keys to standarize across all drivers
'cachecontrol': 'ResponseCacheControl',
'contentdisposition': 'ResponseContentDisposition',
'contentencoding': 'ResponseContentEncoding',
'contentlanguage': 'ResponseContentLanguage',
'contenttype': 'ResponseContentType',
'expires': 'ResponseExpires',
}
#: `S3.Client.put_object
#: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html
#: #S3.Client.put_object>`_
_PUT_OBJECT_KEYS = {
'acl': 'ACL',
'body': 'Body',
'bucket': 'Bucket',
'cachecontrol': 'CacheControl',
'contentdisposition': 'ContentDisposition',
'contentencoding': 'ContentEncoding',
'contentlanguage': 'ContentLanguage',
'contentlength': 'ContentLength',
'contentmd5': 'ContentMD5',
'contenttype': 'ContentType',
'expires': 'Expires',
'grantfullcontrol': 'GrantFullControl',
'grantread': 'GrantRead',
'grantreadacp': 'GrantReadACP',
'grantwriteacp': 'GrantWriteACP',
'key': 'Key',
'metadata': 'Metadata',
'serversideencryption': 'ServerSideEncryption',
'storageclass': 'StorageClass',
'websiteredirectlocation': 'WebsiteRedirectLocation',
'ssecustomeralgorithm': 'SSECustomerAlgorithm',
'ssecustomerkey': 'SSECustomerKey',
'ssekmskeyid': 'SSEKMSKeyId',
'requestpayer': 'RequestPayer',
'tagging': 'Tagging',
}
#: `S3.Client.delete_object
#: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html
#: #S3.Client.delete_object>`_
_DELETE_OBJECT_KEYS = {
'bucket': 'Bucket',
'key': 'Key',
'mfa': 'MFA',
'versionid': 'VersionId',
'requestpayer': 'RequestPayer',
}
#: `S3.Bucket.create
#: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html
#: #S3.Bucket.create>`_
_POST_CONTAINER_KEYS = {
'acl': 'ACL',
'bucket': 'Bucket',
'createbucketconfiguration': 'CreateBucketConfiguration',
'locationconstraint': 'LocationConstraint',
'grantfullcontrol': 'GrantFullControl',
'grantread': 'GrantRead',
'grantreadacp': 'GrantReadACP',
'grantwrite': 'GrantWrite',
'grantwriteacp': 'GrantWriteACP',
}