"""Google Cloud Storage Driver."""
import base64
import codecs
import logging
import os
import pathlib
from datetime import datetime, timedelta
from http import HTTPStatus
from typing import Any, Dict, Iterable, List # noqa: F401
# noinspection PyPackageRequirements
from google.auth.exceptions import GoogleAuthError
# noinspection PyPackageRequirements
from google.cloud import storage
# noinspection PyPackageRequirements
from google.cloud.exceptions import Conflict, NotFound
# noinspection PyPackageRequirements
from google.cloud.storage.blob import Blob as GoogleBlob
# noinspection PyPackageRequirements
from google.cloud.storage.bucket import Bucket
from inflection import underscore
from cloudstorage import Blob, Container, Driver, messages
from cloudstorage.exceptions import (
CloudStorageError,
CredentialsError,
IsNotEmptyError,
NotFoundError,
)
from cloudstorage.helpers import file_content_type, validate_file_or_path
from cloudstorage.typed import (
ContentLength,
ExtraOptions,
FileLike,
FormPost,
MetaData,
)
__all__ = ["GoogleStorageDriver"]
logger = logging.getLogger(__name__)
[docs]class GoogleStorageDriver(Driver):
"""Driver for interacting with Google Cloud Storage.
The driver will check for `GOOGLE_APPLICATION_CREDENTIALS` environment
variable before connecting. If not found, the driver will use service
worker credentials json file path passed to `key` argument.
.. code-block:: python
from cloudstorage.drivers.google import GoogleStorageDriver
credentials_json_file = '/path/cloud-storage-service-account.json'
storage = GoogleStorageDriver(key=credentials_json_file)
# <Driver: GOOGLESTORAGE>
.. todo: Support for container or blob encryption key.
.. todo: Support for buckets with more than 256 objects on iteration.
References:
* `Google Cloud Storage Documentation
<https://cloud.google.com/storage/docs>`_
* `Storage Client
<https://googleapis.github.io/google-cloud-python/latest/storage/index.html>`_
* `snippets.py
<https://github.com/GoogleCloudPlatform/python-docs-samples/blob/
master/storage/cloud-client/snippets_test.py>`_
:param key: (optional) File path to service worker credentials json file.
:type key: str or None
:param kwargs: (optional) Extra driver options.
:type kwargs: dict
:raise CloudStorageError: If `GOOGLE_APPLICATION_CREDENTIALS` environment
variable is not set and/or credentials json file is not passed to the
`key` argument.
"""
name = "GOOGLESTORAGE"
hash_type = "md5" # TODO: QUESTION: Switch to crc32c?
url = "https://cloud.google.com/storage"
def __init__(self, key: str = None, **kwargs: Dict) -> None:
super().__init__(key=key, **kwargs)
if key:
os.environ[self._CREDENTIALS_ENV_NAME] = key
else:
logger.debug(
"No key provided, attempting to authenticate with Google Metadata API"
)
google_application_credentials = os.getenv(self._CREDENTIALS_ENV_NAME)
if google_application_credentials and not os.path.isfile(
google_application_credentials
):
raise CredentialsError(
"Please set environment variable "
"'GOOGLE_APPLICATION_CREDENTIALS' or provider file path "
"to Google service account key json file."
)
self._client = storage.Client()
[docs] def __iter__(self) -> Iterable[Container]:
for bucket in self.client.list_buckets():
yield self._make_container(bucket)
[docs] def __len__(self) -> int:
containers = self.client.list_buckets()
return len(list(containers))
@staticmethod
def _normalize_parameters(
params: Dict[str, str], normalizers: Dict[str, str]
) -> Dict[str, str]:
normalized = params.copy()
for key, value in params.items():
normalized.pop(key)
if not value:
continue
key_inflected = underscore(key).lower()
# Only include parameters found in normalizers
key_overrider = normalizers.get(key_inflected)
if key_overrider:
normalized[key_overrider] = value
return normalized
def _get_blob(self, bucket_name: str, blob_name: str) -> GoogleBlob:
"""Get a blob object by name.
:param bucket_name: The name of the container that containers the blob.
:type bucket_name:
:param blob_name: The name of the blob to get.
:type blob_name: str
:return: The blob object if it exists.
:rtype: :class:`google.client.storage.blob.Blob`
"""
bucket = self._get_bucket(bucket_name)
blob = bucket.get_blob(blob_name)
if not blob:
raise NotFoundError(messages.BLOB_NOT_FOUND % (blob_name, bucket_name))
return blob
def _get_bucket(self, bucket_name: str) -> Bucket:
"""Get a bucket by name.
:param bucket_name: The name of the bucket to get.
:type bucket_name: str
:return: The bucket matching the name provided.
:rtype: :class:`google.cloud.storage.bucket.Bucket`
"""
try:
return self.client.get_bucket(bucket_name)
except NotFound:
raise NotFoundError(messages.CONTAINER_NOT_FOUND % bucket_name)
def _make_container(self, bucket: Bucket) -> Container:
"""Convert Google Storage Bucket to Cloud Storage Container.
:param bucket: The bucket to convert.
:type bucket: :class:`google.client.storage.bucket.Bucket`
:return: A container instance.
:rtype: :class:`.Container`
"""
acl = bucket.acl
created_at = bucket.time_created.astimezone(tz=None)
return Container(
name=bucket.name,
driver=self,
acl=acl,
meta_data=None,
created_at=created_at,
)
def _make_blob(self, container: Container, blob: GoogleBlob) -> Blob:
"""Convert Google Storage Blob to a Cloud Storage Blob.
References:
* `Objects <https://cloud.google.com/storage/docs/json_api/v1/objects>`_
:param container: Container instance.
:type container: :class:`.Container`
:param blob: Google Storage blob.
:type blob: :class:`google.cloud.storage.blob.Blob`
:return: Blob instance.
:rtype: :class:`.Blob`
"""
etag_bytes = base64.b64decode(blob.etag)
try:
etag = etag_bytes.hex()
except AttributeError:
# Python 3.4: 'bytes' object has no attribute 'hex'
etag = codecs.encode(etag_bytes, "hex_codec").decode("ascii")
md5_bytes = base64.b64decode(blob.md5_hash)
try:
md5_hash = md5_bytes.hex()
except AttributeError:
# Python 3.4: 'bytes' object has no attribute 'hex'
md5_hash = codecs.encode(md5_bytes, "hex_codec").decode("ascii")
return Blob(
name=blob.name,
checksum=md5_hash,
etag=etag,
size=blob.size,
container=container,
driver=self,
acl=blob.acl,
meta_data=blob.metadata,
content_disposition=blob.content_disposition,
content_type=blob.content_type,
cache_control=blob.cache_control,
created_at=blob.time_created,
modified_at=blob.updated,
)
@property
def client(self) -> storage.client.Client:
"""The client bound to this driver.
:return: Client for interacting with the Google Cloud Storage API.
:rtype: :class:`google.cloud.storage.client.Client`
"""
return self._client
[docs] def validate_credentials(self) -> None:
try:
for _ in self.client.list_buckets():
break
except GoogleAuthError as err:
raise CredentialsError(str(err))
@property
def regions(self) -> List[str]:
logger.warning("Regions not supported.")
return []
[docs] def create_container(
self, container_name: str, acl: str = None, meta_data: MetaData = None
) -> Container:
if meta_data:
logger.warning(messages.OPTION_NOT_SUPPORTED, "meta_data")
try:
bucket = self.client.create_bucket(container_name)
except Conflict:
logger.debug(messages.CONTAINER_EXISTS, container_name)
bucket = self._get_bucket(container_name)
except ValueError as err:
raise CloudStorageError(str(err))
if acl:
bucket.acl.save_predefined(acl)
return self._make_container(bucket)
[docs] def get_container(self, container_name: str) -> Container:
bucket = self._get_bucket(container_name)
return self._make_container(bucket)
[docs] def patch_container(self, container: Container) -> None:
raise NotImplementedError
[docs] def delete_container(self, container: Container) -> None:
bucket = self._get_bucket(container.name)
try:
bucket.delete()
except Conflict as err:
if err.code == HTTPStatus.CONFLICT:
raise IsNotEmptyError(messages.CONTAINER_NOT_EMPTY % bucket.name)
raise
[docs] def container_cdn_url(self, container: Container) -> str:
return "https://storage.googleapis.com/%s" % container.name
[docs] def enable_container_cdn(self, container: Container) -> bool:
bucket = self._get_bucket(container.name)
bucket.make_public(recursive=True, future=True)
return True
[docs] def disable_container_cdn(self, container: Container) -> bool:
bucket = self._get_bucket(container.name)
bucket.acl.all().revoke_read() # opposite of make_public
bucket.acl.save()
return True
[docs] def upload_blob(
self,
container: Container,
filename: FileLike,
blob_name: str = None,
acl: str = None,
meta_data: MetaData = None,
content_type: str = None,
content_disposition: str = None,
cache_control: str = None,
chunk_size: int = 1024,
extra: ExtraOptions = None,
) -> Blob:
extra = extra if extra is not None else {}
extra_args = self._normalize_parameters(extra, self._PUT_OBJECT_KEYS)
extra_args.setdefault("metadata", meta_data)
extra_args.setdefault("content_type", content_type)
extra_args.setdefault("content_disposition", content_disposition)
extra_args.setdefault("cache_control", cache_control)
bucket = self._get_bucket(container.name)
blob_name = blob_name or validate_file_or_path(filename)
blob = bucket.blob(blob_name)
# Default Content-Type is application/octet-stream for upload_from_file
if not content_type:
content_type = file_content_type(blob.name)
if isinstance(filename, str):
blob.upload_from_filename(filename=filename, content_type=content_type)
else:
blob.upload_from_file(file_obj=filename, content_type=content_type)
if acl:
blob.acl.save_predefined(acl)
# Google object metadata (Content-Type set above)
for attr_name, attr_value in extra_args.items():
if attr_name and hasattr(blob, attr_name):
setattr(blob, attr_name, attr_value)
blob.patch()
return self._make_blob(container, blob)
[docs] def get_blob(self, container: Container, blob_name: str) -> Blob:
g_blob = self._get_blob(container.name, blob_name)
return self._make_blob(container, g_blob)
[docs] def get_blobs(self, container: Container) -> Iterable[Blob]:
bucket = self._get_bucket(container.name)
for blob in bucket.list_blobs():
yield self._make_blob(container, blob)
[docs] def download_blob(self, blob: Blob, destination: FileLike) -> None:
g_blob = self._get_blob(blob.container.name, blob.name)
if isinstance(destination, str):
g_blob.download_to_filename(destination)
else:
g_blob.download_to_file(destination)
[docs] def patch_blob(self, blob: Blob) -> None:
raise NotImplementedError
[docs] def delete_blob(self, blob: Blob) -> None:
g_blob = self._get_blob(blob.container.name, blob.name)
g_blob.delete()
[docs] def blob_cdn_url(self, blob: Blob) -> str:
return self._get_blob(blob.container.name, blob.name).public_url
[docs] def generate_container_upload_url(
self,
container: Container,
blob_name: str,
expires: int = 3600,
acl: str = None,
meta_data: MetaData = None,
content_disposition: str = None,
content_length: ContentLength = None,
content_type: str = None,
cache_control: str = None,
extra: ExtraOptions = None,
) -> FormPost:
meta_data = meta_data if meta_data is not None else {}
extra = extra if extra is not None else {}
extra_norm = self._normalize_parameters(extra, self._POST_OBJECT_KEYS)
bucket = self._get_bucket(container.name)
conditions = [
# file name can start with any valid character.
["starts-with", "$key", ""]
] # type: List[Any]
fields = {}
if acl:
conditions.append({"acl": acl})
fields["acl"] = acl
headers = {
"Content-Disposition": content_disposition,
"Content-Type": content_type,
"Cache-Control": cache_control,
}
for header_name, header_value in headers.items():
if not header_value:
continue
fields[header_name.lower()] = header_value
conditions.append(["eq", "$" + header_name, header_value])
# Add content-length-range which is a tuple
if content_length:
min_range, max_range = content_length
conditions.append(["content-length-range", min_range, max_range])
for meta_name, meta_value in meta_data.items():
meta_name = self._OBJECT_META_PREFIX + meta_name
fields[meta_name] = meta_value
conditions.append({meta_name: meta_value})
# Add extra conditions and fields
for extra_name, extra_value in extra_norm.items():
fields[extra_name] = extra_value
conditions.append({extra_name: extra_value})
# Determine key value for blob name when uploaded
if not blob_name: # user provided filename
fields["key"] = "${filename}"
else:
path = pathlib.Path(blob_name)
if path.suffix: # blob_name is filename
fields["key"] = blob_name
else: # prefix + user provided filename
fields["key"] = blob_name + "${filename}"
logger.debug("conditions=%s", conditions)
logger.debug("fields=%s", fields)
expiration = datetime.utcnow() + timedelta(seconds=expires)
# noinspection PyTypeChecker
policy = bucket.generate_upload_policy(
conditions=conditions, expiration=expiration
)
fields.update(policy)
url = "https://{bucket_name}.storage.googleapis.com".format(
bucket_name=container.name
)
return {"url": url, "fields": fields}
[docs] def generate_blob_download_url(
self,
blob: Blob,
expires: int = 3600,
method: str = "GET",
content_disposition: str = None,
extra: ExtraOptions = None,
) -> str:
extra = extra if extra is not None else {}
params = self._normalize_parameters(extra, self._GET_OBJECT_KEYS)
expiration = timedelta(seconds=int(expires))
method_norm = method.upper()
response_type = params.get("content_type", None)
generation = params.get("version", None)
g_blob = self._get_blob(blob.container.name, blob.name)
return g_blob.generate_signed_url(
expiration=expiration,
method=method_norm,
content_type="",
generation=generation,
response_disposition=content_disposition,
response_type=response_type,
)
_CREDENTIALS_ENV_NAME = "GOOGLE_APPLICATION_CREDENTIALS"
_OBJECT_META_PREFIX = "x-goog-meta-"
#: `insert-object
#: <https://cloud.google.com/storage/docs/json_api/v1/objects/insert>`
#: Mapping is for blob class attribute names
_PUT_OBJECT_KEYS = {
"acl": "acl",
"bucket": "bucket",
"cache_control": "cache_control",
"content_disposition": "content_disposition",
"content_encoding": "content_encoding",
"content_length": "content_length",
"content_type": "content_type",
"expires": "expires",
"meta_data": "metadata",
}
#: `post-object
#: <https://cloud.google.com/storage/docs/xml-api/post-object>`_
_POST_OBJECT_KEYS = {
"acl": "acl",
"bucket": "bucket",
"cache_control": "Cache-Control",
"content_disposition": "Content-Disposition",
"content_encoding": "Content-Encoding",
"content_length": "Content-Length",
"content_type": "Content-Type",
"expires": "Expires",
"key": "Key",
"success_action_redirect": "success_action_redirect",
"success_action_status": "success_action_status",
"meta_data": "Metadata",
"x_goog_meta_": "x-goog-meta-",
"content_length_range": "content-length-range",
}
#: `get-object
#: <https://cloud.google.com/storage/docs/xml-api/get-object>`_
_GET_OBJECT_KEYS = {
"content_disposition": "response_disposition",
"content_type": "response_type",
}