Source code for cloudstorage.drivers.amazon

"""Amazon Simple Storage Service (S3) Driver."""
import logging
from typing import Any, Dict, Iterable, List, TYPE_CHECKING  # noqa: F401
from urllib.parse import quote, urljoin

import boto3
from botocore.exceptions import ClientError, ParamValidationError, WaiterError
from inflection import camelize, underscore

from cloudstorage import Blob, Container, Driver, messages
from cloudstorage.exceptions import (
    CloudStorageError,
    CredentialsError,
    IsNotEmptyError,
    NotFoundError,
)
from cloudstorage.helpers import file_content_type, validate_file_or_path
from cloudstorage.typed import (
    ContentLength,
    ExtraOptions,
    FileLike,
    FormPost,
    MetaData,
)

if TYPE_CHECKING:
    from cloudstorage.structures import CaseInsensitiveDict  # noqa

__all__ = ["S3Driver"]

logger = logging.getLogger(__name__)


[docs]class S3Driver(Driver): """Driver for interacting with Amazon Simple Storage Service (S3). .. code-block:: python from cloudstorage.drivers.amazon import S3Driver storage = S3Driver(key='<my-aws-access-key-id>', secret='<my-aws-secret-access-key>', region='us-east-1') # <Driver: S3 us-east-1> References: * `Boto 3 Docs <https://boto3.amazonaws.com/v1/documentation/api/ latest/index.html>`_ * `Amazon S3 REST API Introduction <https://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html>`_ :param key: AWS Access Key ID. :type key: str :param secret: AWS Secret Access Key. :type secret: str :param region: (optional) Region to connect to. Defaults to `us-east-1`. :type region: str :param kwargs: (optional) Extra driver options. :type kwargs: dict """ name = "S3" hash_type = "md5" url = "https://aws.amazon.com/s3/" def __init__( self, key: str, secret: str = None, region: str = "us-east-1", **kwargs: Dict ) -> None: region = region.lower() super().__init__(key=key, secret=secret, region=region, **kwargs) self._session = boto3.Session( aws_access_key_id=key, aws_secret_access_key=secret, region_name=region ) # session required for loading regions list if region not in self.regions: raise CloudStorageError(messages.REGION_NOT_FOUND % region)
[docs] def __iter__(self) -> Iterable[Container]: for bucket in self.s3.buckets.all(): yield self._make_container(bucket)
[docs] def __len__(self) -> int: buckets = [bucket for bucket in self.s3.buckets.all()] return len(buckets)
@staticmethod def _normalize_parameters( params: Dict[str, str], normalizers: Dict[str, str] ) -> Dict[str, str]: normalized = params.copy() for key, value in params.items(): normalized.pop(key) if not value: continue key_inflected = camelize(underscore(key), uppercase_first_letter=True) # Only include parameters found in normalizers key_overrider = normalizers.get(key_inflected.lower()) if key_overrider: normalized[key_overrider] = value return normalized def _get_bucket(self, bucket_name: str, validate: bool = True): """Get a S3 bucket. :param bucket_name: The Bucket's name identifier. :type bucket_name: str :param validate: If True, verify that the bucket exists. :type validate: bool :return: S3 bucket resource object. :rtype: :class:`boto3.s3.Bucket` :raises NotFoundError: If the bucket does not exist. :raises CloudStorageError: Boto 3 client error. """ bucket = self.s3.Bucket(bucket_name) if validate: try: response = self.s3.meta.client.head_bucket(Bucket=bucket_name) logger.debug("response=%s", response) except ClientError as err: error_code = int(err.response["Error"]["Code"]) if error_code == 404: raise NotFoundError(messages.CONTAINER_NOT_FOUND % bucket_name) raise CloudStorageError( "%s: %s" % (err.response["Error"]["Code"], err.response["Error"]["Message"]) ) try: bucket.wait_until_exists() except WaiterError as err: logger.error(err) return bucket def _make_blob(self, container: Container, object_summary) -> Blob: """Convert S3 Object Summary to Blob instance. :param container: The container that holds the blob. :type container: :class:`.Container` :param object_summary: S3 object summary. :type object_summary: :class:`boto3.s3.ObjectSummary` :return: A blob object. :rtype: :class:`.Blob` :raise NotFoundError: If the blob object doesn't exist. """ try: name = object_summary.key #: etag wrapped in quotes checksum = etag = object_summary.e_tag.replace('"', "") size = object_summary.size acl = object_summary.Acl() meta_data = object_summary.meta.data.get("Metadata", {}) content_disposition = object_summary.meta.data.get( "ContentDisposition", None ) content_type = object_summary.meta.data.get("ContentType", None) cache_control = object_summary.meta.data.get("CacheControl", None) modified_at = object_summary.last_modified created_at = None expires_at = None # TODO: FEATURE: Delete at / expires at except ClientError as err: error_code = int(err.response["Error"]["Code"]) if error_code == 404: raise NotFoundError( messages.BLOB_NOT_FOUND % (object_summary.key, container.name) ) raise CloudStorageError( "%s: %s" % (err.response["Error"]["Code"], err.response["Error"]["Message"]) ) return Blob( name=name, checksum=checksum, etag=etag, size=size, container=container, driver=self, acl=acl, meta_data=meta_data, content_disposition=content_disposition, content_type=content_type, cache_control=cache_control, created_at=created_at, modified_at=modified_at, expires_at=expires_at, ) def _make_container(self, bucket) -> Container: """Convert S3 Bucket to Container. :param bucket: S3 bucket object. :type bucket: :class:`boto3.s3.Bucket` :return: The container if it exists. :rtype: :class:`.Container` """ acl = bucket.Acl() created_at = bucket.creation_date.astimezone(tz=None) return Container( name=bucket.name, driver=self, acl=acl, meta_data=None, created_at=created_at, ) def _create_bucket_params(self, params: Dict[Any, Any]) -> Dict[Any, Any]: """Process extra create bucket params. :param params: Default create bucket parameters. :return: Final create bucket parameters. """ # TODO: BUG: Creating S3 bucket in us-east-1 if self.region != "us-east-1": params["CreateBucketConfiguration"] = { "LocationConstraint": self.region, } return params @property def session(self) -> boto3.session.Session: """Amazon Web Services session. :return: AWS session. :rtype: :class:`boto3.session.Session` """ return self._session # noinspection PyUnresolvedReferences @property def s3(self) -> boto3.resources.base.ServiceResource: """S3 service resource. :return: The s3 resource instance. :rtype: :class:`boto3.resources.base.ServiceResource` """ return self.session.resource(service_name="s3", region_name=self.region)
[docs] def validate_credentials(self) -> None: try: self.session.client("sts").get_caller_identity() except ClientError as err: raise CredentialsError(str(err))
@property def regions(self) -> List[str]: return self.session.get_available_regions("s3")
[docs] def create_container( self, container_name: str, acl: str = None, meta_data: MetaData = None ) -> Container: if meta_data: logger.info(messages.OPTION_NOT_SUPPORTED, "meta_data") # Required parameters params = { "Bucket": container_name, } # type: Dict[Any, Any] if acl: params["ACL"] = acl.lower() params = self._create_bucket_params(params) logger.debug("params=%s", params) try: bucket = self.s3.create_bucket(**params) except ParamValidationError as err: msg = err.kwargs.get("report", messages.CONTAINER_NAME_INVALID) raise CloudStorageError(msg) try: bucket.wait_until_exists() except WaiterError as err: logger.error(err) return self._make_container(bucket)
[docs] def get_container(self, container_name: str) -> Container: bucket = self._get_bucket(container_name) return self._make_container(bucket)
[docs] def patch_container(self, container: Container) -> None: raise NotImplementedError
[docs] def delete_container(self, container: Container) -> None: bucket = self._get_bucket(container.name, validate=False) try: bucket.delete() except ClientError as err: error_code = err.response["Error"]["Code"] if error_code == "BucketNotEmpty": raise IsNotEmptyError(messages.CONTAINER_NOT_EMPTY % bucket.name) raise
[docs] def container_cdn_url(self, container: Container) -> str: bucket = self._get_bucket(container.name, validate=False) endpoint_url = bucket.meta.client.meta.endpoint_url return "%s/%s" % (endpoint_url, container.name)
[docs] def enable_container_cdn(self, container: Container) -> bool: logger.warning(messages.FEATURE_NOT_SUPPORTED, "enable_container_cdn") return False
[docs] def disable_container_cdn(self, container: Container) -> bool: logger.warning(messages.FEATURE_NOT_SUPPORTED, "disable_container_cdn") return False
[docs] def upload_blob( self, container: Container, filename: FileLike, blob_name: str = None, acl: str = None, meta_data: MetaData = None, content_type: str = None, content_disposition: str = None, cache_control: str = None, chunk_size: int = 1024, extra: ExtraOptions = None, ) -> Blob: meta_data = {} if meta_data is None else meta_data extra = {} if extra is None else extra extra_args = self._normalize_parameters(extra, self._PUT_OBJECT_KEYS) config = boto3.s3.transfer.TransferConfig(io_chunksize=chunk_size) # Default arguments extra_args.setdefault("Metadata", meta_data) extra_args.setdefault("StorageClass", "STANDARD") if acl: extra_args.setdefault("ACL", acl.lower()) if cache_control: extra_args.setdefault("CacheControl", cache_control) if content_disposition: extra_args["ContentDisposition"] = content_disposition blob_name = blob_name or validate_file_or_path(filename) # Boto uses application/octet-stream by default if not content_type: if isinstance(filename, str): # TODO: QUESTION: Any advantages between filename vs blob_name? extra_args["ContentType"] = file_content_type(filename) else: extra_args["ContentType"] = file_content_type(blob_name) else: extra_args["ContentType"] = content_type logger.debug("extra_args=%s", extra_args) if isinstance(filename, str): self.s3.Bucket(container.name).upload_file( Filename=filename, Key=blob_name, ExtraArgs=extra_args, Config=config ) else: self.s3.Bucket(container.name).upload_fileobj( Fileobj=filename, Key=blob_name, ExtraArgs=extra_args, Config=config ) return self.get_blob(container, blob_name)
[docs] def get_blob(self, container: Container, blob_name: str) -> Blob: object_summary = self.s3.ObjectSummary( bucket_name=container.name, key=blob_name ) return self._make_blob(container, object_summary)
[docs] def get_blobs(self, container: Container) -> Iterable[Blob]: bucket = self._get_bucket(container.name, validate=False) for key in bucket.objects.all(): # s3.ObjectSummary yield self._make_blob(container, key)
[docs] def download_blob(self, blob: Blob, destination: FileLike) -> None: if isinstance(destination, str): self.s3.Bucket(name=blob.container.name).download_file( Key=blob.name, Filename=destination, ExtraArgs={} ) else: self.s3.Bucket(name=blob.container.name).download_fileobj( Key=blob.name, Fileobj=destination, ExtraArgs={} )
[docs] def patch_blob(self, blob: Blob) -> None: raise NotImplementedError
[docs] def delete_blob(self, blob: Blob) -> None: # Required parameters params = { "Bucket": blob.container.name, "Key": blob.name, } logger.debug("params=%s", params) try: response = self.s3.meta.client.delete_object(**params) logger.debug("response=%s", response) except ClientError as err: error_code = int(err.response["Error"]["Code"]) if error_code != 200 or error_code != 204: raise NotFoundError( messages.BLOB_NOT_FOUND % (blob.name, blob.container.name) ) raise
[docs] def blob_cdn_url(self, blob: Blob) -> str: container_url = self.container_cdn_url(blob.container) blob_name_cleaned = quote(blob.name) blob_path = "%s/%s" % (container_url, blob_name_cleaned) url = urljoin(container_url, blob_path) return url
[docs] def generate_container_upload_url( self, container: Container, blob_name: str, expires: int = 3600, acl: str = None, meta_data: MetaData = None, content_disposition: str = None, content_length: ContentLength = None, content_type: str = None, cache_control: str = None, extra: ExtraOptions = None, ) -> FormPost: meta_data = {} if meta_data is None else meta_data extra = {} if extra is None else extra extra_norm = self._normalize_parameters(extra, self._POST_OBJECT_KEYS) conditions = [] # type: List[Any] fields = {} # type: Dict[Any, Any] if acl: conditions.append({"acl": acl}) fields["acl"] = acl headers = { "Content-Disposition": content_disposition, "Content-Type": content_type, "Cache-Control": cache_control, } for header_name, header_value in headers.items(): if not header_value: continue fields[header_name.lower()] = header_value conditions.append(["eq", "$" + header_name, header_value]) # Add content-length-range which is a tuple if content_length: min_range, max_range = content_length conditions.append(["content-length-range", min_range, max_range]) for meta_name, meta_value in meta_data.items(): meta_name = self._OBJECT_META_PREFIX + meta_name fields[meta_name] = meta_value conditions.append({meta_name: meta_value}) # Add extra conditions and fields for extra_name, extra_value in extra_norm.items(): fields[extra_name] = extra_value conditions.append({extra_name: extra_value}) return self.s3.meta.client.generate_presigned_post( Bucket=container.name, Key=blob_name, Fields=fields, Conditions=conditions, ExpiresIn=int(expires), )
[docs] def generate_blob_download_url( self, blob: Blob, expires: int = 3600, method: str = "GET", content_disposition: str = None, extra: ExtraOptions = None, ) -> str: extra = extra if extra is not None else {} params = self._normalize_parameters(extra, self._GET_OBJECT_KEYS) # Required parameters params["Bucket"] = blob.container.name params["Key"] = blob.name # Optional if content_disposition: params["ResponseContentDisposition"] = content_disposition logger.debug("params=%s", params) return self.s3.meta.client.generate_presigned_url( ClientMethod="get_object", Params=params, ExpiresIn=int(expires), HttpMethod=method.lower(), )
_OBJECT_META_PREFIX = "x-amz-meta-" # type: str #: `S3.Client.generate_presigned_post #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Client.generate_presigned_post>`_ _POST_OBJECT_KEYS = { "acl": "acl", "cachecontrol": "Cache-Control", "contenttype": "Content-Type", "contentdisposition": "Content-Disposition", "contentencoding": "Content-Encoding", "expires": "Expires", "successactionredirect": "success_action_redirect", "redirect": "redirect", "successactionstatus": "success_action_status", "xamzmeta": "x-amz-meta-", } #: `#S3.Client.get_object #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Client.get_object>`_ _GET_OBJECT_KEYS = { "bucket": "Bucket", "ifmatch": "IfMatch", "ifmodifiedsince": "IfModifiedSince", "ifnonematch": "IfNoneMatch", "ifunmodifiedsince": "IfUnmodifiedSince", "key": "Key", "range": "Range", "responsecachecontrol": "ResponseCacheControl", "responsecontentdisposition": "ResponseContentDisposition", "responsecontentencoding": "ResponseContentEncoding", "responsecontentlanguage": "ResponseContentLanguage", "responsecontenttype": "ResponseContentType", "responseexpires": "ResponseExpires", "versionid": "VersionId", "ssecustomeralgorithm": "SSECustomerAlgorithm", "ssecustomerkey": "SSECustomerKey", "requestpayer": "RequestPayer", "partnumber": "PartNumber", # Extra keys to standarize across all drivers "cachecontrol": "ResponseCacheControl", "contentdisposition": "ResponseContentDisposition", "contentencoding": "ResponseContentEncoding", "contentlanguage": "ResponseContentLanguage", "contenttype": "ResponseContentType", "expires": "ResponseExpires", } #: `S3.Client.put_object #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Client.put_object>`_ _PUT_OBJECT_KEYS = { "acl": "ACL", "body": "Body", "bucket": "Bucket", "cachecontrol": "CacheControl", "contentdisposition": "ContentDisposition", "contentencoding": "ContentEncoding", "contentlanguage": "ContentLanguage", "contentlength": "ContentLength", "contentmd5": "ContentMD5", "contenttype": "ContentType", "expires": "Expires", "grantfullcontrol": "GrantFullControl", "grantread": "GrantRead", "grantreadacp": "GrantReadACP", "grantwriteacp": "GrantWriteACP", "key": "Key", "metadata": "Metadata", "serversideencryption": "ServerSideEncryption", "storageclass": "StorageClass", "websiteredirectlocation": "WebsiteRedirectLocation", "ssecustomeralgorithm": "SSECustomerAlgorithm", "ssecustomerkey": "SSECustomerKey", "ssekmskeyid": "SSEKMSKeyId", "requestpayer": "RequestPayer", "tagging": "Tagging", } #: `S3.Client.delete_object #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Client.delete_object>`_ _DELETE_OBJECT_KEYS = { "bucket": "Bucket", "key": "Key", "mfa": "MFA", "versionid": "VersionId", "requestpayer": "RequestPayer", } #: `S3.Bucket.create #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Bucket.create>`_ _POST_CONTAINER_KEYS = { "acl": "ACL", "bucket": "Bucket", "createbucketconfiguration": "CreateBucketConfiguration", "locationconstraint": "LocationConstraint", "grantfullcontrol": "GrantFullControl", "grantread": "GrantRead", "grantreadacp": "GrantReadACP", "grantwrite": "GrantWrite", "grantwriteacp": "GrantWriteACP", }