"""Google Cloud Storage Driver."""
import base64
import codecs
import logging
import os
import pathlib
from datetime import datetime, timedelta
try:
from http import HTTPStatus
except ImportError:
# noinspection PyUnresolvedReferences
from httpstatus import HTTPStatus
from typing import Dict, Iterable, List, Union
# noinspection PyPackageRequirements
from google.cloud import storage
# noinspection PyPackageRequirements
from google.cloud.exceptions import Conflict, NotFound
# noinspection PyPackageRequirements
from google.cloud.storage.blob import Blob as GoogleBlob
# noinspection PyPackageRequirements
from google.cloud.storage.bucket import Bucket
from inflection import underscore
from cloudstorage.base import (Blob, Container, ContentLength, Driver,
ExtraOptions, FileLike, FormPost, MetaData)
from cloudstorage.exceptions import (CloudStorageError, IsNotEmptyError,
NotFoundError)
from cloudstorage.helpers import file_content_type, validate_file_or_path
from cloudstorage.messages import *
logger = logging.getLogger(__name__)
[docs]class GoogleStorageDriver(Driver):
"""Driver for interacting with Google Cloud Storage.
The driver will check for `GOOGLE_APPLICATION_CREDENTIALS` environment
variable before connecting. If not found, the driver will use service
worker credentials json file path passed to `key` argument.
.. code-block:: python
from cloudstorage.drivers.google import GoogleStorageDriver
credentials_json_file = '/path/cloud-storage-service-account.json'
storage = GoogleStorageDriver(key=credentials_json_file)
# <Driver: GOOGLESTORAGE>
.. todo: Support for container or blob encryption key.
.. todo: Support for buckets with more than 256 objects on iteration.
References:
* `Google Cloud Storage Documentation
<https://cloud.google.com/storage/docs>`_
* `Storage Client <https://googlecloudplatform.github.io/
google-cloud-python/stable/storage-client.html>`_
* `snippets.py
<https://github.com/GoogleCloudPlatform/python-docs-samples/blob/
master/storage/cloud-client/snippets.py>`_
:param key: (optional) File path to service worker credentials json file.
:type key: str or None
:param kwargs: (optional) Catch invalid options.
:type kwargs: dict
:raise CloudStorageError: If `GOOGLE_APPLICATION_CREDENTIALS` environment
variable is not set and/or credentials json file
is not passed to the `key` argument.
"""
name = 'GOOGLESTORAGE'
hash_type = 'md5' # TODO: QUESTION: Switch to crc32c?
url = 'https://cloud.google.com/storage'
def __init__(self, key: str = None, **kwargs: Dict) -> None:
super().__init__(key=key)
google_application_credentials = os.getenv(self._CREDENTIALS_ENV_NAME)
# Set environment variable using credentials json file path.
if not google_application_credentials:
if not os.path.isfile(key):
raise CloudStorageError(
"Please set environment variable "
"'GOOGLE_APPLICATION_CREDENTIALS' or provider file path "
"to Google service account key json file.")
os.environ[self._CREDENTIALS_ENV_NAME] = key
self._client = storage.Client()
[docs] def __iter__(self) -> Iterable[Container]:
for bucket in self.client.list_buckets():
yield self._make_container(bucket)
[docs] def __len__(self) -> int:
containers = self.client.list_buckets()
return len(list(containers))
@staticmethod
def _normalize_parameters(params: Dict[str, str],
normalizers: Dict[str, str]) -> Dict[str, str]:
normalized = params.copy()
for key, value in params.items():
normalized.pop(key)
if not value:
continue
key_inflected = underscore(key).lower()
# Only include parameters found in normalizers
key_overrider = normalizers.get(key_inflected)
if key_overrider:
normalized[key_overrider] = value
return normalized
def _get_blob(self, bucket_name: str, blob_name: str) -> GoogleBlob:
"""Get a blob object by name.
:param bucket_name: The name of the container that containers the blob.
:type bucket_name:
:param blob_name: The name of the blob to get.
:type blob_name: str
:return: The blob object if it exists.
:rtype: :class:`google.client.storage.blob.Blob`
"""
bucket = self._get_bucket(bucket_name)
blob = bucket.get_blob(blob_name)
if not blob:
raise NotFoundError(blob_not_found % (blob_name, bucket_name))
return blob
def _get_bucket(self, bucket_name: str) -> Bucket:
"""Get a bucket by name.
:param bucket_name: The name of the bucket to get.
:type bucket_name: str
:return: The bucket matching the name provided.
:rtype: :class:`google.cloud.storage.bucket.Bucket`
"""
try:
return self.client.get_bucket(bucket_name)
except NotFound:
raise NotFoundError(container_not_found % bucket_name)
def _make_container(self, bucket: Bucket) -> Container:
"""Convert Google Storage Bucket to Cloud Storage Container.
:param bucket: The bucket to convert.
:type bucket: :class:`google.client.storage.bucket.Bucket`
:return: A container instance.
:rtype: :class:`.Container`
"""
acl = bucket.acl
created_at = bucket.time_created.astimezone(tz=None)
return Container(name=bucket.name, driver=self, acl=acl,
meta_data=None, created_at=created_at)
def _make_blob(self, container: Container, blob: GoogleBlob) -> Blob:
"""Convert Google Storage Blob to a Cloud Storage Blob.
References:
* `Objects <https://cloud.google.com/storage/docs/json_api/v1/objects>`_
:param container: Container instance.
:type container: :class:`.Container`
:param blob: Google Storage blob.
:type blob: :class:`google.cloud.storage.blob.Blob`
:return: Blob instance.
:rtype: :class:`.Blob`
"""
etag_bytes = base64.b64decode(blob.etag)
try:
etag = etag_bytes.hex()
except AttributeError:
# Python 3.4: 'bytes' object has no attribute 'hex'
etag = codecs.encode(etag_bytes, 'hex_codec').decode('ascii')
md5_bytes = base64.b64decode(blob.md5_hash)
try:
md5_hash = md5_bytes.hex()
except AttributeError:
# Python 3.4: 'bytes' object has no attribute 'hex'
md5_hash = codecs.encode(md5_bytes, 'hex_codec').decode('ascii')
return Blob(name=blob.name, checksum=md5_hash, etag=etag,
size=blob.size, container=container, driver=self,
acl=blob.acl, meta_data=blob.metadata,
content_disposition=blob.content_disposition,
content_type=blob.content_type,
created_at=blob.time_created,
modified_at=blob.updated)
@property
def client(self) -> storage.client.Client:
"""The client bound to this driver.
:return: Client for interacting with the Google Cloud Storage API.
:rtype: :class:`google.cloud.storage.client.Client`
"""
return self._client
@property
def regions(self) -> List[str]:
logger.warning('Regions not supported.')
return []
[docs] def create_container(self, container_name: str, acl: str = None,
meta_data: MetaData = None) -> Container:
if meta_data:
logger.warning(option_not_supported % 'meta_data')
try:
bucket = self.client.create_bucket(container_name)
except Conflict:
logger.debug(container_exists % container_name)
bucket = self._get_bucket(container_name)
except ValueError as e:
raise CloudStorageError(str(e))
if acl:
bucket.acl.save_predefined(acl)
return self._make_container(bucket)
[docs] def get_container(self, container_name: str) -> Container:
bucket = self._get_bucket(container_name)
return self._make_container(bucket)
[docs] def patch_container(self, container: Container) -> None:
raise NotImplementedError
[docs] def delete_container(self, container: Container) -> None:
bucket = self._get_bucket(container.name)
try:
bucket.delete()
except Conflict as e:
if e.code == HTTPStatus.CONFLICT:
raise IsNotEmptyError(container_not_empty % bucket.name)
raise
[docs] def container_cdn_url(self, container: Container) -> str:
return 'https://storage.googleapis.com/%s' % container.name
[docs] def enable_container_cdn(self, container: Container) -> bool:
bucket = self._get_bucket(container.name)
bucket.make_public(recursive=True, future=True)
return True
[docs] def disable_container_cdn(self, container: Container) -> bool:
bucket = self._get_bucket(container.name)
bucket.acl.all().revoke_read() # opposite of make_public
bucket.acl.save()
return True
[docs] def upload_blob(self, container: Container, filename: Union[str, FileLike],
blob_name: str = None, acl: str = None,
meta_data: MetaData = None, content_type: str = None,
content_disposition: str = None,
extra: ExtraOptions = None) -> Blob:
extra = extra if extra is not None else {}
extra_args = self._normalize_parameters(extra, self._PUT_OBJECT_KEYS)
extra_args.setdefault('metadata', meta_data)
extra_args.setdefault('content_type', content_type)
extra_args.setdefault('content_disposition', content_disposition)
bucket = self._get_bucket(container.name)
blob_name = blob_name or validate_file_or_path(filename)
blob = bucket.blob(blob_name)
# Default Content-Type is application/octet-stream for upload_from_file
if not content_type:
content_type = file_content_type(blob.name)
if isinstance(filename, str):
blob.upload_from_filename(filename=filename,
content_type=content_type)
else:
blob.upload_from_file(file_obj=filename, content_type=content_type)
if acl:
blob.acl.save_predefined(acl)
# Google object metadata (Content-Type set above)
for attr_name, attr_value in extra_args.items():
if attr_name and hasattr(blob, attr_name):
setattr(blob, attr_name, attr_value)
blob.patch()
return self._make_blob(container, blob)
[docs] def get_blob(self, container: Container, blob_name: str) -> Blob:
blob = self._get_blob(container.name, blob_name)
return self._make_blob(container, blob)
[docs] def get_blobs(self, container: Container) -> Iterable[Blob]:
bucket = self._get_bucket(container.name)
for blob in bucket.list_blobs():
yield self._make_blob(container, blob)
[docs] def download_blob(self, blob: Blob,
destination: Union[str, FileLike]) -> None:
blob = self._get_blob(blob.container.name, blob.name)
if isinstance(destination, str):
blob.download_to_filename(destination)
else:
blob.download_to_file(destination)
[docs] def patch_blob(self, blob: Blob) -> None:
raise NotImplementedError
[docs] def delete_blob(self, blob: Blob) -> None:
blob_ = self._get_blob(blob.container.name, blob.name)
blob_.delete()
[docs] def blob_cdn_url(self, blob: Blob) -> str:
return self._get_blob(blob.container.name, blob.name).public_url
[docs] def generate_container_upload_url(self, container: Container,
blob_name: str,
expires: int = 3600, acl: str = None,
meta_data: MetaData = None,
content_disposition: str = None,
content_length: ContentLength = None,
content_type: str = None,
extra: ExtraOptions = None) -> FormPost:
meta_data = meta_data if meta_data is not None else {}
extra = extra if extra is not None else {}
extra_norm = self._normalize_parameters(extra, self._POST_OBJECT_KEYS)
bucket = self._get_bucket(container.name)
conditions = [
# file name can start with any valid character.
['starts-with', '$key', '']
]
fields = {}
if acl: # noinspection PyTypeChecker
conditions.append({'acl': acl})
fields['acl'] = acl
headers = {
'Content-Disposition': content_disposition,
'Content-Type': content_type,
}
for header_name, header_value in headers.items():
if not header_value:
continue
fields[header_name.lower()] = header_value
conditions.append(['eq', '$' + header_name, header_value])
# Add content-length-range which is a tuple
if content_length:
min_range, max_range = content_length
conditions.append(['content-length-range', min_range, max_range])
for meta_name, meta_value in meta_data.items():
meta_name = self._OBJECT_META_PREFIX + meta_name
fields[meta_name] = meta_value
# noinspection PyTypeChecker
conditions.append({meta_name: meta_value})
# Add extra conditions and fields
for extra_name, extra_value in extra_norm.items():
fields[extra_name] = extra_value
# noinspection PyTypeChecker
conditions.append({extra_name: extra_value})
# Determine key value for blob name when uploaded
if not blob_name: # user provided filename
fields['key'] = '${filename}'
else:
path = pathlib.Path(blob_name)
if path.suffix: # blob_name is filename
fields['key'] = blob_name
else: # prefix + user provided filename
fields['key'] = blob_name + '${filename}'
logger.debug('conditions=%s' % conditions)
logger.debug('fields=%s' % fields)
expiration = datetime.utcnow() + timedelta(seconds=expires)
# noinspection PyTypeChecker
policy = bucket.generate_upload_policy(conditions=conditions,
expiration=expiration)
fields.update(policy)
url = 'https://{bucket_name}.storage.googleapis.com'.format(
bucket_name=container.name)
return {'url': url, 'fields': fields}
[docs] def generate_blob_download_url(self, blob: Blob, expires: int = 3600,
method: str = 'GET',
content_disposition: str = None,
extra: ExtraOptions = None) -> str:
extra = extra if extra is not None else {}
params = self._normalize_parameters(extra, self._GET_OBJECT_KEYS)
expiration = timedelta(seconds=int(expires))
method_norm = method.upper()
response_type = params.get('content_type', None)
generation = params.get('version', None)
blob = self._get_blob(blob.container.name, blob.name)
return blob.generate_signed_url(
expiration=expiration, method=method_norm,
content_type='', generation=generation,
response_disposition=content_disposition,
response_type=response_type)
_CREDENTIALS_ENV_NAME = 'GOOGLE_APPLICATION_CREDENTIALS'
_OBJECT_META_PREFIX = 'x-goog-meta-'
#: `insert-object
#: <https://cloud.google.com/storage/docs/json_api/v1/objects/insert>`
#: Mapping is for blob class attribute names
_PUT_OBJECT_KEYS = {
'acl': 'acl',
'bucket': 'bucket',
'cache_control': 'cache_control',
'content_disposition': 'content_disposition',
'content_encoding': 'content_encoding',
'content_length': 'content_length',
'content_type': 'content_type',
'expires': 'expires',
'meta_data': 'metadata',
}
#: `post-object
#: <https://cloud.google.com/storage/docs/xml-api/post-object>`_
_POST_OBJECT_KEYS = {
'acl': 'acl',
'bucket': 'bucket',
'cache_control': 'Cache-Control',
'content_disposition': 'Content-Disposition',
'content_encoding': 'Content-Encoding',
'content_length': 'Content-Length',
'content_type': 'Content-Type',
'expires': 'Expires',
'key': 'Key',
'success_action_redirect': 'success_action_redirect',
'success_action_status': 'success_action_status',
'meta_data': 'Metadata',
'x_goog_meta_': 'x-goog-meta-',
'content_length_range': 'content-length-range',
}
#: `get-object
#: <https://cloud.google.com/storage/docs/xml-api/get-object>`_
_GET_OBJECT_KEYS = {
'content_disposition': 'response_disposition',
'content_type': 'response_type',
}