"""Google Cloud Storage Driver."""
import base64
import codecs
import logging
import os
import pathlib
from datetime import datetime, timedelta
try:
from http import HTTPStatus
except ImportError:
# noinspection PyUnresolvedReferences
from httpstatus import HTTPStatus
from typing import Dict, Iterable, List, Any # noqa: F401
# noinspection PyPackageRequirements
from google.cloud import storage
# noinspection PyPackageRequirements
from google.cloud.exceptions import Conflict, NotFound
# noinspection PyPackageRequirements
from google.auth.exceptions import GoogleAuthError
# noinspection PyPackageRequirements
from google.cloud.storage.blob import Blob as GoogleBlob
# noinspection PyPackageRequirements
from google.cloud.storage.bucket import Bucket
from inflection import underscore
from cloudstorage import Blob, Container, Driver, messages
from cloudstorage.exceptions import (
CloudStorageError,
IsNotEmptyError,
NotFoundError,
CredentialsError,
)
from cloudstorage.helpers import file_content_type, validate_file_or_path
from cloudstorage.typed import (
FileLike,
MetaData,
ContentLength,
ExtraOptions,
FormPost,
)
__all__ = ['GoogleStorageDriver']
logger = logging.getLogger(__name__)
[docs]class GoogleStorageDriver(Driver):
"""Driver for interacting with Google Cloud Storage.
The driver will check for `GOOGLE_APPLICATION_CREDENTIALS` environment
variable before connecting. If not found, the driver will use service
worker credentials json file path passed to `key` argument.
.. code-block:: python
from cloudstorage.drivers.google import GoogleStorageDriver
credentials_json_file = '/path/cloud-storage-service-account.json'
storage = GoogleStorageDriver(key=credentials_json_file)
# <Driver: GOOGLESTORAGE>
.. todo: Support for container or blob encryption key.
.. todo: Support for buckets with more than 256 objects on iteration.
References:
* `Google Cloud Storage Documentation
<https://cloud.google.com/storage/docs>`_
* `Storage Client <https://googleapis.github.io/google-cloud-python/latest/storage/index.html>`_
* `snippets.py
<https://github.com/GoogleCloudPlatform/python-docs-samples/blob/
master/storage/cloud-client/snippets.py>`_
:param key: (optional) File path to service worker credentials json file.
:type key: str or None
:param kwargs: (optional) Extra driver options.
:type kwargs: dict
:raise CloudStorageError: If `GOOGLE_APPLICATION_CREDENTIALS` environment
variable is not set and/or credentials json file is not passed to the
`key` argument.
"""
name = 'GOOGLESTORAGE'
hash_type = 'md5' # TODO: QUESTION: Switch to crc32c?
url = 'https://cloud.google.com/storage'
def __init__(self, key: str = None, **kwargs: Dict) -> None:
super().__init__(key=key, **kwargs)
if key:
os.environ[self._CREDENTIALS_ENV_NAME] = key
google_application_credentials = os.getenv(self._CREDENTIALS_ENV_NAME)
if not os.path.isfile(google_application_credentials):
raise CredentialsError(
"Please set environment variable "
"'GOOGLE_APPLICATION_CREDENTIALS' or provider file path "
"to Google service account key json file.")
self._client = storage.Client()
[docs] def __iter__(self) -> Iterable[Container]:
for bucket in self.client.list_buckets():
yield self._make_container(bucket)
[docs] def __len__(self) -> int:
containers = self.client.list_buckets()
return len(list(containers))
@staticmethod
def _normalize_parameters(params: Dict[str, str],
normalizers: Dict[str, str]) -> Dict[str, str]:
normalized = params.copy()
for key, value in params.items():
normalized.pop(key)
if not value:
continue
key_inflected = underscore(key).lower()
# Only include parameters found in normalizers
key_overrider = normalizers.get(key_inflected)
if key_overrider:
normalized[key_overrider] = value
return normalized
def _get_blob(self, bucket_name: str, blob_name: str) -> GoogleBlob:
"""Get a blob object by name.
:param bucket_name: The name of the container that containers the blob.
:type bucket_name:
:param blob_name: The name of the blob to get.
:type blob_name: str
:return: The blob object if it exists.
:rtype: :class:`google.client.storage.blob.Blob`
"""
bucket = self._get_bucket(bucket_name)
blob = bucket.get_blob(blob_name)
if not blob:
raise NotFoundError(messages.BLOB_NOT_FOUND % (blob_name,
bucket_name))
return blob
def _get_bucket(self, bucket_name: str) -> Bucket:
"""Get a bucket by name.
:param bucket_name: The name of the bucket to get.
:type bucket_name: str
:return: The bucket matching the name provided.
:rtype: :class:`google.cloud.storage.bucket.Bucket`
"""
try:
return self.client.get_bucket(bucket_name)
except NotFound:
raise NotFoundError(messages.CONTAINER_NOT_FOUND % bucket_name)
def _make_container(self, bucket: Bucket) -> Container:
"""Convert Google Storage Bucket to Cloud Storage Container.
:param bucket: The bucket to convert.
:type bucket: :class:`google.client.storage.bucket.Bucket`
:return: A container instance.
:rtype: :class:`.Container`
"""
acl = bucket.acl
created_at = bucket.time_created.astimezone(tz=None)
return Container(name=bucket.name, driver=self, acl=acl,
meta_data=None, created_at=created_at)
def _make_blob(self, container: Container, blob: GoogleBlob) -> Blob:
"""Convert Google Storage Blob to a Cloud Storage Blob.
References:
* `Objects <https://cloud.google.com/storage/docs/json_api/v1/objects>`_
:param container: Container instance.
:type container: :class:`.Container`
:param blob: Google Storage blob.
:type blob: :class:`google.cloud.storage.blob.Blob`
:return: Blob instance.
:rtype: :class:`.Blob`
"""
etag_bytes = base64.b64decode(blob.etag)
try:
etag = etag_bytes.hex()
except AttributeError:
# Python 3.4: 'bytes' object has no attribute 'hex'
etag = codecs.encode(etag_bytes, 'hex_codec').decode('ascii')
md5_bytes = base64.b64decode(blob.md5_hash)
try:
md5_hash = md5_bytes.hex()
except AttributeError:
# Python 3.4: 'bytes' object has no attribute 'hex'
md5_hash = codecs.encode(md5_bytes, 'hex_codec').decode('ascii')
return Blob(name=blob.name, checksum=md5_hash, etag=etag,
size=blob.size, container=container, driver=self,
acl=blob.acl, meta_data=blob.metadata,
content_disposition=blob.content_disposition,
content_type=blob.content_type,
cache_control=blob.cache_control,
created_at=blob.time_created,
modified_at=blob.updated)
@property
def client(self) -> storage.client.Client:
"""The client bound to this driver.
:return: Client for interacting with the Google Cloud Storage API.
:rtype: :class:`google.cloud.storage.client.Client`
"""
return self._client
[docs] def validate_credentials(self) -> None:
try:
for _ in self.client.list_buckets():
break
except GoogleAuthError as err:
raise CredentialsError(str(err))
@property
def regions(self) -> List[str]:
logger.warning('Regions not supported.')
return []
[docs] def create_container(self, container_name: str, acl: str = None,
meta_data: MetaData = None) -> Container:
if meta_data:
logger.warning(messages.OPTION_NOT_SUPPORTED, 'meta_data')
try:
bucket = self.client.create_bucket(container_name)
except Conflict:
logger.debug(messages.CONTAINER_EXISTS, container_name)
bucket = self._get_bucket(container_name)
except ValueError as err:
raise CloudStorageError(str(err))
if acl:
bucket.acl.save_predefined(acl)
return self._make_container(bucket)
[docs] def get_container(self, container_name: str) -> Container:
bucket = self._get_bucket(container_name)
return self._make_container(bucket)
[docs] def patch_container(self, container: Container) -> None:
raise NotImplementedError
[docs] def delete_container(self, container: Container) -> None:
bucket = self._get_bucket(container.name)
try:
bucket.delete()
except Conflict as err:
if err.code == HTTPStatus.CONFLICT:
raise IsNotEmptyError(messages.CONTAINER_NOT_EMPTY %
bucket.name)
raise
[docs] def container_cdn_url(self, container: Container) -> str:
return 'https://storage.googleapis.com/%s' % container.name
[docs] def enable_container_cdn(self, container: Container) -> bool:
bucket = self._get_bucket(container.name)
bucket.make_public(recursive=True, future=True)
return True
[docs] def disable_container_cdn(self, container: Container) -> bool:
bucket = self._get_bucket(container.name)
bucket.acl.all().revoke_read() # opposite of make_public
bucket.acl.save()
return True
[docs] def upload_blob(self, container: Container, filename: FileLike,
blob_name: str = None, acl: str = None,
meta_data: MetaData = None, content_type: str = None,
content_disposition: str = None, cache_control: str = None,
chunk_size: int = 1024,
extra: ExtraOptions = None) -> Blob:
extra = extra if extra is not None else {}
extra_args = self._normalize_parameters(extra, self._PUT_OBJECT_KEYS)
extra_args.setdefault('metadata', meta_data)
extra_args.setdefault('content_type', content_type)
extra_args.setdefault('content_disposition', content_disposition)
extra_args.setdefault('cache_control', cache_control)
bucket = self._get_bucket(container.name)
blob_name = blob_name or validate_file_or_path(filename)
blob = bucket.blob(blob_name)
# Default Content-Type is application/octet-stream for upload_from_file
if not content_type:
content_type = file_content_type(blob.name)
if isinstance(filename, str):
blob.upload_from_filename(filename=filename,
content_type=content_type)
else:
blob.upload_from_file(file_obj=filename, content_type=content_type)
if acl:
blob.acl.save_predefined(acl)
# Google object metadata (Content-Type set above)
for attr_name, attr_value in extra_args.items():
if attr_name and hasattr(blob, attr_name):
setattr(blob, attr_name, attr_value)
blob.patch()
return self._make_blob(container, blob)
[docs] def get_blob(self, container: Container, blob_name: str) -> Blob:
g_blob = self._get_blob(container.name, blob_name)
return self._make_blob(container, g_blob)
[docs] def get_blobs(self, container: Container) -> Iterable[Blob]:
bucket = self._get_bucket(container.name)
for blob in bucket.list_blobs():
yield self._make_blob(container, blob)
[docs] def download_blob(self, blob: Blob,
destination: FileLike) -> None:
g_blob = self._get_blob(blob.container.name, blob.name)
if isinstance(destination, str):
g_blob.download_to_filename(destination)
else:
g_blob.download_to_file(destination)
[docs] def patch_blob(self, blob: Blob) -> None:
raise NotImplementedError
[docs] def delete_blob(self, blob: Blob) -> None:
g_blob = self._get_blob(blob.container.name, blob.name)
g_blob.delete()
[docs] def blob_cdn_url(self, blob: Blob) -> str:
return self._get_blob(blob.container.name, blob.name).public_url
[docs] def generate_container_upload_url(self, container: Container,
blob_name: str,
expires: int = 3600, acl: str = None,
meta_data: MetaData = None,
content_disposition: str = None,
content_length: ContentLength = None,
content_type: str = None,
cache_control: str = None,
extra: ExtraOptions = None) -> FormPost:
meta_data = meta_data if meta_data is not None else {}
extra = extra if extra is not None else {}
extra_norm = self._normalize_parameters(extra, self._POST_OBJECT_KEYS)
bucket = self._get_bucket(container.name)
conditions = [
# file name can start with any valid character.
['starts-with', '$key', '']
] # type: List[Any]
fields = {}
if acl:
conditions.append({'acl': acl})
fields['acl'] = acl
headers = {
'Content-Disposition': content_disposition,
'Content-Type': content_type,
'Cache-Control': cache_control,
}
for header_name, header_value in headers.items():
if not header_value:
continue
fields[header_name.lower()] = header_value
conditions.append(['eq', '$' + header_name, header_value])
# Add content-length-range which is a tuple
if content_length:
min_range, max_range = content_length
conditions.append(['content-length-range', min_range, max_range])
for meta_name, meta_value in meta_data.items():
meta_name = self._OBJECT_META_PREFIX + meta_name
fields[meta_name] = meta_value
conditions.append({meta_name: meta_value})
# Add extra conditions and fields
for extra_name, extra_value in extra_norm.items():
fields[extra_name] = extra_value
conditions.append({extra_name: extra_value})
# Determine key value for blob name when uploaded
if not blob_name: # user provided filename
fields['key'] = '${filename}'
else:
path = pathlib.Path(blob_name)
if path.suffix: # blob_name is filename
fields['key'] = blob_name
else: # prefix + user provided filename
fields['key'] = blob_name + '${filename}'
logger.debug('conditions=%s', conditions)
logger.debug('fields=%s', fields)
expiration = datetime.utcnow() + timedelta(seconds=expires)
# noinspection PyTypeChecker
policy = bucket.generate_upload_policy(conditions=conditions,
expiration=expiration)
fields.update(policy)
url = 'https://{bucket_name}.storage.googleapis.com'.format(
bucket_name=container.name)
return {'url': url, 'fields': fields}
[docs] def generate_blob_download_url(self, blob: Blob, expires: int = 3600,
method: str = 'GET',
content_disposition: str = None,
extra: ExtraOptions = None) -> str:
extra = extra if extra is not None else {}
params = self._normalize_parameters(extra, self._GET_OBJECT_KEYS)
expiration = timedelta(seconds=int(expires))
method_norm = method.upper()
response_type = params.get('content_type', None)
generation = params.get('version', None)
g_blob = self._get_blob(blob.container.name, blob.name)
return g_blob.generate_signed_url(
expiration=expiration, method=method_norm,
content_type='', generation=generation,
response_disposition=content_disposition,
response_type=response_type)
_CREDENTIALS_ENV_NAME = 'GOOGLE_APPLICATION_CREDENTIALS'
_OBJECT_META_PREFIX = 'x-goog-meta-'
#: `insert-object
#: <https://cloud.google.com/storage/docs/json_api/v1/objects/insert>`
#: Mapping is for blob class attribute names
_PUT_OBJECT_KEYS = {
'acl': 'acl',
'bucket': 'bucket',
'cache_control': 'cache_control',
'content_disposition': 'content_disposition',
'content_encoding': 'content_encoding',
'content_length': 'content_length',
'content_type': 'content_type',
'expires': 'expires',
'meta_data': 'metadata',
}
#: `post-object
#: <https://cloud.google.com/storage/docs/xml-api/post-object>`_
_POST_OBJECT_KEYS = {
'acl': 'acl',
'bucket': 'bucket',
'cache_control': 'Cache-Control',
'content_disposition': 'Content-Disposition',
'content_encoding': 'Content-Encoding',
'content_length': 'Content-Length',
'content_type': 'Content-Type',
'expires': 'Expires',
'key': 'Key',
'success_action_redirect': 'success_action_redirect',
'success_action_status': 'success_action_status',
'meta_data': 'Metadata',
'x_goog_meta_': 'x-goog-meta-',
'content_length_range': 'content-length-range',
}
#: `get-object
#: <https://cloud.google.com/storage/docs/xml-api/get-object>`_
_GET_OBJECT_KEYS = {
'content_disposition': 'response_disposition',
'content_type': 'response_type',
}