Source code for cloudstorage.drivers.amazon

"""Amazon Simple Storage Service (S3) Driver."""

import logging
from typing import Dict, Iterable, List, Union
from urllib.parse import quote, urljoin

import boto3
from botocore.exceptions import ClientError, WaiterError, ParamValidationError
from inflection import camelize, underscore

from cloudstorage.base import (Blob, Container, ContentLength, Driver,
                               ExtraOptions, FileLike, FormPost, MetaData)
from cloudstorage.exceptions import (CloudStorageError, IsNotEmptyError,
                                     NotFoundError)
from cloudstorage.helpers import file_content_type, validate_file_or_path
from cloudstorage.messages import *

logger = logging.getLogger(__name__)


[docs]class S3Driver(Driver): """Driver for interacting with Amazon Simple Storage Service (S3). .. code-block:: python from cloudstorage.drivers.amazon import S3Driver storage = S3Driver(key='<my-aws-access-key-id>', secret='<my-aws-secret-access-key>', region='us-east-1') # <Driver: S3 us-east-1> References: * `Boto 3 Docs <http://boto3.readthedocs.io>`_ * `Amazon S3 REST API Introduction <https://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html>`_ :param key: AWS Access Key ID. :type key: str :param secret: AWS Secret Access Key. :type secret: str :param region: (optional) Region to connect to. Defaults to `us-east-1`. :type region: str """ name = 'S3' hash_type = 'md5' url = 'https://aws.amazon.com/s3/' def __init__(self, key, secret=None, region='us-east-1') -> None: region = region.lower() super().__init__(key=key, secret=secret, region=region) self._session = boto3.Session(aws_access_key_id=key, aws_secret_access_key=secret, region_name=region) # session required for loading regions list if region not in self.regions: raise CloudStorageError(region_not_found % region)
[docs] def __iter__(self) -> Iterable[Container]: for bucket in self.s3.buckets.all(): yield self._make_container(bucket)
[docs] def __len__(self) -> int: buckets = [bucket for bucket in self.s3.buckets.all()] return len(buckets)
@staticmethod def _normalize_parameters(params: Dict[str, str], normalizers: Dict[str, str]) -> Dict[str, str]: normalized = params.copy() for key, value in params.items(): normalized.pop(key) if not value: continue key_inflected = camelize(underscore(key), uppercase_first_letter=True) # Only include parameters found in normalizers key_overrider = normalizers.get(key_inflected.lower()) if key_overrider: normalized[key_overrider] = value return normalized def _get_bucket(self, bucket_name: str, validate: bool = True): """Get a S3 bucket. :param bucket_name: The Bucket's name identifier. :type bucket_name: str :param validate: If True, verify that the bucket exists. :type validate: bool :return: S3 bucket resource object. :rtype: :class:`boto3.s3.Bucket` :raises NotFoundError: If the bucket does not exist. :raises CloudStorageError: Boto 3 client error. """ bucket = self.s3.Bucket(bucket_name) if validate: try: response = self.s3.meta.client.head_bucket(Bucket=bucket_name) logger.debug('response=%s' % response) except ClientError as e: error_code = int(e.response['Error']['Code']) if error_code == 404: raise NotFoundError(container_not_found % bucket_name) raise CloudStorageError('%s: %s' % ( e.response['Error']['Code'], e.response['Error']['Message'])) try: bucket.wait_until_exists() except WaiterError as e: # TODO: QUESTION: Raise NotFoundError exception? logging.error(e, exc_info=True) return bucket def _make_blob(self, container: Container, object_summary) -> Blob: """Convert S3 Object Summary to Blob instance. :param container: The container that holds the blob. :type container: :class:`.Container` :param object_summary: S3 object summary. :type object_summary: :class:`boto3.s3.ObjectSummary` :return: A blob object. :rtype: :class:`.Blob` :raise NotFoundError: If the blob object doesn't exist. """ try: name = object_summary.key #: etag wrapped in quotes checksum = etag = object_summary.e_tag.replace('"', '') size = object_summary.size acl = object_summary.Acl() meta_data = object_summary.meta.data.get('Metadata', {}) content_disposition = object_summary.meta.data.get( 'ContentDisposition', None) content_type = object_summary.meta.data.get('ContentType', None) modified_at = object_summary.last_modified created_at = None expires_at = None # TODO: FEATURE: Delete at / expires at except ClientError as e: error_code = int(e.response['Error']['Code']) if error_code == 404: raise NotFoundError(blob_not_found % (container.name, object_summary.key)) raise CloudStorageError('%s: %s' % ( e.response['Error']['Code'], e.response['Error']['Message'])) return Blob(name=name, checksum=checksum, etag=etag, size=size, container=container, driver=self, acl=acl, meta_data=meta_data, content_disposition=content_disposition, content_type=content_type, created_at=created_at, modified_at=modified_at, expires_at=expires_at) def _make_container(self, bucket) -> Container: """Convert S3 Bucket to Container. :param bucket: S3 bucket object. :type bucket: :class:`boto3.s3.Bucket` :return: The container if it exists. :rtype: :class:`.Container` """ acl = bucket.Acl() created_at = bucket.creation_date.astimezone(tz=None) return Container(name=bucket.name, driver=self, acl=acl, meta_data=None, created_at=created_at) @property def session(self) -> boto3.session.Session: """Amazon Web Services session. :return: AWS session. :rtype: :class:`boto3.session.Session` """ return self._session # noinspection PyUnresolvedReferences @property def s3(self) -> boto3.resources.base.ServiceResource: """S3 service resource. :return: The s3 resource instance. :rtype: :class:`boto3.resources.base.ServiceResource` """ return self.session.resource(service_name='s3', region_name=self.region) @property def regions(self) -> List[str]: return self.session.get_available_regions('s3')
[docs] def create_container(self, container_name: str, acl: str = None, meta_data: MetaData = None) -> Container: if meta_data: logger.info(option_not_supported % 'meta_data') # Required parameters params = { 'Bucket': container_name, } if acl: params['ACL'] = acl.lower() # TODO: BUG: Creating S3 bucket in us-east-1 # See https://github.com/boto/boto3/issues/125 if self.region != 'us-east-1': params['CreateBucketConfiguration'] = { 'LocationConstraint': self.region, } logger.debug('params=%s' % params) try: bucket = self.s3.create_bucket(**params) except ParamValidationError as e: msg = e.kwargs.get('report', container_name_invalid) raise CloudStorageError(msg) bucket.wait_until_exists() return self._make_container(bucket)
[docs] def get_container(self, container_name: str) -> Container: bucket = self._get_bucket(container_name) return self._make_container(bucket)
[docs] def patch_container(self, container: Container) -> None: raise NotImplementedError
[docs] def delete_container(self, container: Container) -> None: bucket = self._get_bucket(container.name, validate=False) try: bucket.delete() except ClientError as e: error_code = e.response['Error']['Code'] if error_code == 'BucketNotEmpty': raise IsNotEmptyError(container_not_empty % bucket.name) raise
[docs] def container_cdn_url(self, container: Container) -> str: bucket = self._get_bucket(container.name, validate=False) endpoint_url = bucket.meta.client.meta.endpoint_url return '%s/%s' % (endpoint_url, container.name)
[docs] def enable_container_cdn(self, container: Container) -> bool: logger.warning(feature_not_supported % 'enable_container_cdn') return False
[docs] def disable_container_cdn(self, container: Container) -> bool: logger.warning(feature_not_supported % 'disable_container_cdn') return False
[docs] def upload_blob(self, container: Container, filename: Union[str, FileLike], blob_name: str = None, acl: str = None, meta_data: MetaData = None, content_type: str = None, content_disposition: str = None, extra: ExtraOptions = None) -> Blob: meta_data = {} if meta_data is None else meta_data extra = {} if extra is None else extra extra_args = self._normalize_parameters(extra, self._PUT_OBJECT_KEYS) # Default arguments if acl: extra_args.setdefault('ACL', acl.lower()) extra_args.setdefault('Metadata', meta_data) extra_args.setdefault('StorageClass', 'STANDARD') if content_disposition: extra_args['ContentDisposition'] = content_disposition blob_name = blob_name or validate_file_or_path(filename) # Boto uses application/octet-stream by default if not content_type: if isinstance(filename, str): # TODO: QUESTION: Any advantages between filename vs blob_name? extra_args['ContentType'] = file_content_type(filename) else: extra_args['ContentType'] = file_content_type(blob_name) else: extra_args['ContentType'] = content_type logger.debug('extra_args=%s' % extra_args) if isinstance(filename, str): self.s3.Bucket(container.name).upload_file(Filename=filename, Key=blob_name, ExtraArgs=extra_args) else: self.s3.Bucket(container.name).upload_fileobj(Fileobj=filename, Key=blob_name, ExtraArgs=extra_args) return self.get_blob(container, blob_name)
[docs] def get_blob(self, container: Container, blob_name: str) -> Blob: object_summary = self.s3.ObjectSummary(bucket_name=container.name, key=blob_name) return self._make_blob(container, object_summary)
[docs] def get_blobs(self, container: Container) -> Iterable[Blob]: bucket = self._get_bucket(container.name, validate=False) for key in bucket.objects.all(): # s3.ObjectSummary yield self._make_blob(container, key)
[docs] def download_blob(self, blob: Blob, destination: Union[str, FileLike]) -> None: if isinstance(destination, str): self.s3.Bucket(name=blob.container.name).download_file( Key=blob.name, Filename=destination, ExtraArgs={}) else: self.s3.Bucket(name=blob.container.name).download_fileobj( Key=blob.name, Fileobj=destination, ExtraArgs={})
[docs] def patch_blob(self, blob: Blob) -> None: raise NotImplementedError
[docs] def delete_blob(self, blob: Blob) -> None: # Required parameters params = { 'Bucket': blob.container.name, 'Key': blob.name, } logger.debug('params=%s' % params) try: response = self.s3.meta.client.delete_object(**params) logger.debug('response=%s' % response) except ClientError as e: error_code = int(e.response['Error']['Code']) if error_code != 200 or error_code != 204: raise NotFoundError(blob_not_found % (blob.name, blob.container.name)) raise
[docs] def blob_cdn_url(self, blob: Blob) -> str: container_url = self.container_cdn_url(blob.container) blob_name_cleaned = quote(blob.name) blob_path = '%s/%s' % (container_url, blob_name_cleaned) url = urljoin(container_url, blob_path) return url
[docs] def generate_container_upload_url(self, container: Container, blob_name: str, expires: int = 3600, acl: str = None, meta_data: MetaData = None, content_disposition: str = None, content_length: ContentLength = None, content_type: str = None, extra: ExtraOptions = None) -> FormPost: meta_data = {} if meta_data is None else meta_data extra = {} if extra is None else extra extra_norm = self._normalize_parameters(extra, self._POST_OBJECT_KEYS) conditions = [] fields = {} if acl: conditions.append({'acl': acl}) fields['acl'] = acl headers = { 'Content-Disposition': content_disposition, 'Content-Type': content_type, } for header_name, header_value in headers.items(): if not header_value: continue fields[header_name.lower()] = header_value conditions.append(['eq', '$' + header_name, header_value]) # Add content-length-range which is a tuple if content_length: min_range, max_range = content_length conditions.append(['content-length-range', min_range, max_range]) for meta_name, meta_value in meta_data.items(): meta_name = self._OBJECT_META_PREFIX + meta_name fields[meta_name] = meta_value conditions.append({meta_name: meta_value}) # Add extra conditions and fields for extra_name, extra_value in extra_norm.items(): fields[extra_name] = extra_value conditions.append({extra_name: extra_value}) return self.s3.meta.client.generate_presigned_post( Bucket=container.name, Key=blob_name, Fields=fields, Conditions=conditions, ExpiresIn=int(expires))
[docs] def generate_blob_download_url(self, blob: Blob, expires: int = 3600, method: str = 'GET', content_disposition: str = None, extra: ExtraOptions = None) -> str: extra = extra if extra is not None else {} params = self._normalize_parameters(extra, self._GET_OBJECT_KEYS) # Required parameters params['Bucket'] = blob.container.name params['Key'] = blob.name # Optional if content_disposition: params['ResponseContentDisposition'] = content_disposition logger.debug('params=%s' % params) return self.s3.meta.client.generate_presigned_url( ClientMethod='get_object', Params=params, ExpiresIn=int(expires), HttpMethod=method.lower())
_OBJECT_META_PREFIX = 'x-amz-meta-' #: `S3.Client.generate_presigned_post #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Client.generate_presigned_post>`_ _POST_OBJECT_KEYS = { 'acl': 'acl', 'cachecontrol': 'Cache-Control', 'contenttype': 'Content-Type', 'contentdisposition': 'Content-Disposition', 'contentencoding': 'Content-Encoding', 'expires': 'Expires', 'successactionredirect': 'success_action_redirect', 'redirect': 'redirect', 'successactionstatus': 'success_action_status', 'xamzmeta': 'x-amz-meta-', } #: `#S3.Client.get_object #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Client.get_object>`_ _GET_OBJECT_KEYS = { 'bucket': 'Bucket', 'ifmatch': 'IfMatch', 'ifmodifiedsince': 'IfModifiedSince', 'ifnonematch': 'IfNoneMatch', 'ifunmodifiedsince': 'IfUnmodifiedSince', 'key': 'Key', 'range': 'Range', 'responsecachecontrol': 'ResponseCacheControl', 'responsecontentdisposition': 'ResponseContentDisposition', 'responsecontentencoding': 'ResponseContentEncoding', 'responsecontentlanguage': 'ResponseContentLanguage', 'responsecontenttype': 'ResponseContentType', 'responseexpires': 'ResponseExpires', 'versionid': 'VersionId', 'ssecustomeralgorithm': 'SSECustomerAlgorithm', 'ssecustomerkey': 'SSECustomerKey', 'requestpayer': 'RequestPayer', 'partnumber': 'PartNumber', # Extra keys to standarize across all drivers 'cachecontrol': 'ResponseCacheControl', 'contentdisposition': 'ResponseContentDisposition', 'contentencoding': 'ResponseContentEncoding', 'contentlanguage': 'ResponseContentLanguage', 'contenttype': 'ResponseContentType', 'expires': 'ResponseExpires', } #: `S3.Client.put_object #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Client.put_object>`_ _PUT_OBJECT_KEYS = { 'acl': 'ACL', 'body': 'Body', 'bucket': 'Bucket', 'cachecontrol': 'CacheControl', 'contentdisposition': 'ContentDisposition', 'contentencoding': 'ContentEncoding', 'contentlanguage': 'ContentLanguage', 'contentlength': 'ContentLength', 'contentmd5': 'ContentMD5', 'contenttype': 'ContentType', 'expires': 'Expires', 'grantfullcontrol': 'GrantFullControl', 'grantread': 'GrantRead', 'grantreadacp': 'GrantReadACP', 'grantwriteacp': 'GrantWriteACP', 'key': 'Key', 'metadata': 'Metadata', 'serversideencryption': 'ServerSideEncryption', 'storageclass': 'StorageClass', 'websiteredirectlocation': 'WebsiteRedirectLocation', 'ssecustomeralgorithm': 'SSECustomerAlgorithm', 'ssecustomerkey': 'SSECustomerKey', 'ssekmskeyid': 'SSEKMSKeyId', 'requestpayer': 'RequestPayer', 'tagging': 'Tagging', } #: `S3.Client.delete_object #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Client.delete_object>`_ _DELETE_OBJECT_KEYS = { 'bucket': 'Bucket', 'key': 'Key', 'mfa': 'MFA', 'versionid': 'VersionId', 'requestpayer': 'RequestPayer', } #: `S3.Bucket.create #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Bucket.create>`_ _POST_CONTAINER_KEYS = { 'acl': 'ACL', 'bucket': 'Bucket', 'createbucketconfiguration': 'CreateBucketConfiguration', 'locationconstraint': 'LocationConstraint', 'grantfullcontrol': 'GrantFullControl', 'grantread': 'GrantRead', 'grantreadacp': 'GrantReadACP', 'grantwrite': 'GrantWrite', 'grantwriteacp': 'GrantWriteACP', }