Source code for cloudstorage.drivers.amazon

"""Amazon Simple Storage Service (S3) Driver."""
import logging
from typing import Any, Dict, Iterable, List  # noqa: F401
from urllib.parse import quote, urljoin

import boto3
from botocore.exceptions import ClientError, ParamValidationError, WaiterError
from inflection import camelize, underscore

from cloudstorage import Blob, Container, Driver, messages
from cloudstorage.exceptions import (
    CloudStorageError,
    CredentialsError,
    IsNotEmptyError,
    NotFoundError,
)
from cloudstorage.helpers import file_content_type, validate_file_or_path
from cloudstorage.typed import (
    ContentLength,
    ExtraOptions,
    FileLike,
    FormPost,
    MetaData,
)

__all__ = ['S3Driver']

logger = logging.getLogger(__name__)


[docs]class S3Driver(Driver): """Driver for interacting with Amazon Simple Storage Service (S3). .. code-block:: python from cloudstorage.drivers.amazon import S3Driver storage = S3Driver(key='<my-aws-access-key-id>', secret='<my-aws-secret-access-key>', region='us-east-1') # <Driver: S3 us-east-1> References: * `Boto 3 Docs <https://boto3.amazonaws.com/v1/documentation/api/ latest/index.html>`_ * `Amazon S3 REST API Introduction <https://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html>`_ :param key: AWS Access Key ID. :type key: str :param secret: AWS Secret Access Key. :type secret: str :param region: (optional) Region to connect to. Defaults to `us-east-1`. :type region: str :param kwargs: (optional) Extra driver options. :type kwargs: dict """ name = 'S3' hash_type = 'md5' url = 'https://aws.amazon.com/s3/' def __init__(self, key: str, secret: str = None, region: str = 'us-east-1', **kwargs: Dict) -> None: region = region.lower() super().__init__(key=key, secret=secret, region=region, **kwargs) self._session = boto3.Session(aws_access_key_id=key, aws_secret_access_key=secret, region_name=region) # session required for loading regions list if region not in self.regions: raise CloudStorageError(messages.REGION_NOT_FOUND % region)
[docs] def __iter__(self) -> Iterable[Container]: for bucket in self.s3.buckets.all(): yield self._make_container(bucket)
[docs] def __len__(self) -> int: buckets = [bucket for bucket in self.s3.buckets.all()] return len(buckets)
@staticmethod def _normalize_parameters(params: Dict[str, str], normalizers: Dict[str, str]) -> Dict[str, str]: normalized = params.copy() for key, value in params.items(): normalized.pop(key) if not value: continue key_inflected = camelize(underscore(key), uppercase_first_letter=True) # Only include parameters found in normalizers key_overrider = normalizers.get(key_inflected.lower()) if key_overrider: normalized[key_overrider] = value return normalized def _get_bucket(self, bucket_name: str, validate: bool = True): """Get a S3 bucket. :param bucket_name: The Bucket's name identifier. :type bucket_name: str :param validate: If True, verify that the bucket exists. :type validate: bool :return: S3 bucket resource object. :rtype: :class:`boto3.s3.Bucket` :raises NotFoundError: If the bucket does not exist. :raises CloudStorageError: Boto 3 client error. """ bucket = self.s3.Bucket(bucket_name) if validate: try: response = self.s3.meta.client.head_bucket(Bucket=bucket_name) logger.debug('response=%s', response) except ClientError as err: error_code = int(err.response['Error']['Code']) if error_code == 404: raise NotFoundError(messages.CONTAINER_NOT_FOUND % bucket_name) raise CloudStorageError('%s: %s' % ( err.response['Error']['Code'], err.response['Error']['Message'])) try: bucket.wait_until_exists() except WaiterError as err: logger.error(err) return bucket def _make_blob(self, container: Container, object_summary) -> Blob: """Convert S3 Object Summary to Blob instance. :param container: The container that holds the blob. :type container: :class:`.Container` :param object_summary: S3 object summary. :type object_summary: :class:`boto3.s3.ObjectSummary` :return: A blob object. :rtype: :class:`.Blob` :raise NotFoundError: If the blob object doesn't exist. """ try: name = object_summary.key #: etag wrapped in quotes checksum = etag = object_summary.e_tag.replace('"', '') size = object_summary.size acl = object_summary.Acl() meta_data = object_summary.meta.data.get('Metadata', {}) content_disposition = object_summary.meta.data.get( 'ContentDisposition', None) content_type = object_summary.meta.data.get('ContentType', None) cache_control = object_summary.meta.data.get('CacheControl', None) modified_at = object_summary.last_modified created_at = None expires_at = None # TODO: FEATURE: Delete at / expires at except ClientError as err: error_code = int(err.response['Error']['Code']) if error_code == 404: raise NotFoundError(messages.BLOB_NOT_FOUND % ( container.name, object_summary.key)) raise CloudStorageError('%s: %s' % ( err.response['Error']['Code'], err.response['Error']['Message'])) return Blob(name=name, checksum=checksum, etag=etag, size=size, container=container, driver=self, acl=acl, meta_data=meta_data, content_disposition=content_disposition, content_type=content_type, cache_control=cache_control, created_at=created_at, modified_at=modified_at, expires_at=expires_at) def _make_container(self, bucket) -> Container: """Convert S3 Bucket to Container. :param bucket: S3 bucket object. :type bucket: :class:`boto3.s3.Bucket` :return: The container if it exists. :rtype: :class:`.Container` """ acl = bucket.Acl() created_at = bucket.creation_date.astimezone(tz=None) return Container(name=bucket.name, driver=self, acl=acl, meta_data=None, created_at=created_at) @property def session(self) -> boto3.session.Session: """Amazon Web Services session. :return: AWS session. :rtype: :class:`boto3.session.Session` """ return self._session # noinspection PyUnresolvedReferences @property def s3(self) -> boto3.resources.base.ServiceResource: """S3 service resource. :return: The s3 resource instance. :rtype: :class:`boto3.resources.base.ServiceResource` """ return self.session.resource(service_name='s3', region_name=self.region)
[docs] def validate_credentials(self) -> None: try: self.session.client('sts').get_caller_identity() except ClientError as err: raise CredentialsError(str(err))
@property def regions(self) -> List[str]: return self.session.get_available_regions('s3')
[docs] def create_container(self, container_name: str, acl: str = None, meta_data: MetaData = None) -> Container: if meta_data: logger.info(messages.OPTION_NOT_SUPPORTED, 'meta_data') # Required parameters params = { 'Bucket': container_name, } # type: Dict[Any, Any] if acl: params['ACL'] = acl.lower() # TODO: BUG: Creating S3 bucket in us-east-1 # See https://github.com/boto/boto3/issues/125 if self.region != 'us-east-1': params['CreateBucketConfiguration'] = { 'LocationConstraint': self.region, } logger.debug('params=%s', params) try: bucket = self.s3.create_bucket(**params) except ParamValidationError as err: msg = err.kwargs.get('report', messages.CONTAINER_NAME_INVALID) raise CloudStorageError(msg) try: bucket.wait_until_exists() except WaiterError as err: logger.error(err) return self._make_container(bucket)
[docs] def get_container(self, container_name: str) -> Container: bucket = self._get_bucket(container_name) return self._make_container(bucket)
[docs] def patch_container(self, container: Container) -> None: raise NotImplementedError
[docs] def delete_container(self, container: Container) -> None: bucket = self._get_bucket(container.name, validate=False) try: bucket.delete() except ClientError as err: error_code = err.response['Error']['Code'] if error_code == 'BucketNotEmpty': raise IsNotEmptyError(messages.CONTAINER_NOT_EMPTY % bucket.name) raise
[docs] def container_cdn_url(self, container: Container) -> str: bucket = self._get_bucket(container.name, validate=False) endpoint_url = bucket.meta.client.meta.endpoint_url return '%s/%s' % (endpoint_url, container.name)
[docs] def enable_container_cdn(self, container: Container) -> bool: logger.warning(messages.FEATURE_NOT_SUPPORTED, 'enable_container_cdn') return False
[docs] def disable_container_cdn(self, container: Container) -> bool: logger.warning(messages.FEATURE_NOT_SUPPORTED, 'disable_container_cdn') return False
[docs] def upload_blob(self, container: Container, filename: FileLike, blob_name: str = None, acl: str = None, meta_data: MetaData = None, content_type: str = None, content_disposition: str = None, cache_control: str = None, chunk_size: int = 1024, extra: ExtraOptions = None) -> Blob: meta_data = {} if meta_data is None else meta_data extra = {} if extra is None else extra extra_args = self._normalize_parameters(extra, self._PUT_OBJECT_KEYS) config = boto3.s3.transfer.TransferConfig(io_chunksize=chunk_size) # Default arguments extra_args.setdefault('Metadata', meta_data) extra_args.setdefault('StorageClass', 'STANDARD') if acl: extra_args.setdefault('ACL', acl.lower()) if cache_control: extra_args.setdefault('CacheControl', cache_control) if content_disposition: extra_args['ContentDisposition'] = content_disposition blob_name = blob_name or validate_file_or_path(filename) # Boto uses application/octet-stream by default if not content_type: if isinstance(filename, str): # TODO: QUESTION: Any advantages between filename vs blob_name? extra_args['ContentType'] = file_content_type(filename) else: extra_args['ContentType'] = file_content_type(blob_name) else: extra_args['ContentType'] = content_type logger.debug('extra_args=%s', extra_args) if isinstance(filename, str): self.s3.Bucket(container.name).upload_file(Filename=filename, Key=blob_name, ExtraArgs=extra_args, Config=config) else: self.s3.Bucket(container.name).upload_fileobj(Fileobj=filename, Key=blob_name, ExtraArgs=extra_args, Config=config) return self.get_blob(container, blob_name)
[docs] def get_blob(self, container: Container, blob_name: str) -> Blob: object_summary = self.s3.ObjectSummary(bucket_name=container.name, key=blob_name) return self._make_blob(container, object_summary)
[docs] def get_blobs(self, container: Container) -> Iterable[Blob]: bucket = self._get_bucket(container.name, validate=False) for key in bucket.objects.all(): # s3.ObjectSummary yield self._make_blob(container, key)
[docs] def download_blob(self, blob: Blob, destination: FileLike) -> None: if isinstance(destination, str): self.s3.Bucket(name=blob.container.name).download_file( Key=blob.name, Filename=destination, ExtraArgs={}) else: self.s3.Bucket(name=blob.container.name).download_fileobj( Key=blob.name, Fileobj=destination, ExtraArgs={})
[docs] def patch_blob(self, blob: Blob) -> None: raise NotImplementedError
[docs] def delete_blob(self, blob: Blob) -> None: # Required parameters params = { 'Bucket': blob.container.name, 'Key': blob.name, } logger.debug('params=%s', params) try: response = self.s3.meta.client.delete_object(**params) logger.debug('response=%s', response) except ClientError as err: error_code = int(err.response['Error']['Code']) if error_code != 200 or error_code != 204: raise NotFoundError(messages.BLOB_NOT_FOUND % ( blob.name, blob.container.name)) raise
[docs] def blob_cdn_url(self, blob: Blob) -> str: container_url = self.container_cdn_url(blob.container) blob_name_cleaned = quote(blob.name) blob_path = '%s/%s' % (container_url, blob_name_cleaned) url = urljoin(container_url, blob_path) return url
[docs] def generate_container_upload_url(self, container: Container, blob_name: str, expires: int = 3600, acl: str = None, meta_data: MetaData = None, content_disposition: str = None, content_length: ContentLength = None, content_type: str = None, cache_control: str = None, extra: ExtraOptions = None) -> FormPost: meta_data = {} if meta_data is None else meta_data extra = {} if extra is None else extra extra_norm = self._normalize_parameters(extra, self._POST_OBJECT_KEYS) conditions = [] # type: List[Any] fields = {} # type: Dict[Any, Any] if acl: conditions.append({'acl': acl}) fields['acl'] = acl headers = { 'Content-Disposition': content_disposition, 'Content-Type': content_type, 'Cache-Control': cache_control, } for header_name, header_value in headers.items(): if not header_value: continue fields[header_name.lower()] = header_value conditions.append(['eq', '$' + header_name, header_value]) # Add content-length-range which is a tuple if content_length: min_range, max_range = content_length conditions.append(['content-length-range', min_range, max_range]) for meta_name, meta_value in meta_data.items(): meta_name = self._OBJECT_META_PREFIX + meta_name fields[meta_name] = meta_value conditions.append({meta_name: meta_value}) # Add extra conditions and fields for extra_name, extra_value in extra_norm.items(): fields[extra_name] = extra_value conditions.append({extra_name: extra_value}) return self.s3.meta.client.generate_presigned_post( Bucket=container.name, Key=blob_name, Fields=fields, Conditions=conditions, ExpiresIn=int(expires))
[docs] def generate_blob_download_url(self, blob: Blob, expires: int = 3600, method: str = 'GET', content_disposition: str = None, extra: ExtraOptions = None) -> str: extra = extra if extra is not None else {} params = self._normalize_parameters(extra, self._GET_OBJECT_KEYS) # Required parameters params['Bucket'] = blob.container.name params['Key'] = blob.name # Optional if content_disposition: params['ResponseContentDisposition'] = content_disposition logger.debug('params=%s', params) return self.s3.meta.client.generate_presigned_url( ClientMethod='get_object', Params=params, ExpiresIn=int(expires), HttpMethod=method.lower())
_OBJECT_META_PREFIX = 'x-amz-meta-' # type: str #: `S3.Client.generate_presigned_post #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Client.generate_presigned_post>`_ _POST_OBJECT_KEYS = { 'acl': 'acl', 'cachecontrol': 'Cache-Control', 'contenttype': 'Content-Type', 'contentdisposition': 'Content-Disposition', 'contentencoding': 'Content-Encoding', 'expires': 'Expires', 'successactionredirect': 'success_action_redirect', 'redirect': 'redirect', 'successactionstatus': 'success_action_status', 'xamzmeta': 'x-amz-meta-', } #: `#S3.Client.get_object #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Client.get_object>`_ _GET_OBJECT_KEYS = { 'bucket': 'Bucket', 'ifmatch': 'IfMatch', 'ifmodifiedsince': 'IfModifiedSince', 'ifnonematch': 'IfNoneMatch', 'ifunmodifiedsince': 'IfUnmodifiedSince', 'key': 'Key', 'range': 'Range', 'responsecachecontrol': 'ResponseCacheControl', 'responsecontentdisposition': 'ResponseContentDisposition', 'responsecontentencoding': 'ResponseContentEncoding', 'responsecontentlanguage': 'ResponseContentLanguage', 'responsecontenttype': 'ResponseContentType', 'responseexpires': 'ResponseExpires', 'versionid': 'VersionId', 'ssecustomeralgorithm': 'SSECustomerAlgorithm', 'ssecustomerkey': 'SSECustomerKey', 'requestpayer': 'RequestPayer', 'partnumber': 'PartNumber', # Extra keys to standarize across all drivers 'cachecontrol': 'ResponseCacheControl', 'contentdisposition': 'ResponseContentDisposition', 'contentencoding': 'ResponseContentEncoding', 'contentlanguage': 'ResponseContentLanguage', 'contenttype': 'ResponseContentType', 'expires': 'ResponseExpires', } #: `S3.Client.put_object #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Client.put_object>`_ _PUT_OBJECT_KEYS = { 'acl': 'ACL', 'body': 'Body', 'bucket': 'Bucket', 'cachecontrol': 'CacheControl', 'contentdisposition': 'ContentDisposition', 'contentencoding': 'ContentEncoding', 'contentlanguage': 'ContentLanguage', 'contentlength': 'ContentLength', 'contentmd5': 'ContentMD5', 'contenttype': 'ContentType', 'expires': 'Expires', 'grantfullcontrol': 'GrantFullControl', 'grantread': 'GrantRead', 'grantreadacp': 'GrantReadACP', 'grantwriteacp': 'GrantWriteACP', 'key': 'Key', 'metadata': 'Metadata', 'serversideencryption': 'ServerSideEncryption', 'storageclass': 'StorageClass', 'websiteredirectlocation': 'WebsiteRedirectLocation', 'ssecustomeralgorithm': 'SSECustomerAlgorithm', 'ssecustomerkey': 'SSECustomerKey', 'ssekmskeyid': 'SSEKMSKeyId', 'requestpayer': 'RequestPayer', 'tagging': 'Tagging', } #: `S3.Client.delete_object #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Client.delete_object>`_ _DELETE_OBJECT_KEYS = { 'bucket': 'Bucket', 'key': 'Key', 'mfa': 'MFA', 'versionid': 'VersionId', 'requestpayer': 'RequestPayer', } #: `S3.Bucket.create #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Bucket.create>`_ _POST_CONTAINER_KEYS = { 'acl': 'ACL', 'bucket': 'Bucket', 'createbucketconfiguration': 'CreateBucketConfiguration', 'locationconstraint': 'LocationConstraint', 'grantfullcontrol': 'GrantFullControl', 'grantread': 'GrantRead', 'grantreadacp': 'GrantReadACP', 'grantwrite': 'GrantWrite', 'grantwriteacp': 'GrantWriteACP', }