Source code for cloudstorage.drivers.google

"""Google Cloud Storage Driver."""

import base64
import codecs
import logging
import os
import pathlib
from datetime import datetime, timedelta

try:
    from http import HTTPStatus
except ImportError:
    # noinspection PyUnresolvedReferences
    from httpstatus import HTTPStatus
from typing import Dict, Iterable, List, Union

# noinspection PyPackageRequirements
from google.cloud import storage
# noinspection PyPackageRequirements
from google.cloud.exceptions import Conflict, NotFound
# noinspection PyPackageRequirements
from google.cloud.storage.blob import Blob as GoogleBlob
# noinspection PyPackageRequirements
from google.cloud.storage.bucket import Bucket
from inflection import underscore

from cloudstorage.base import (Blob, Container, ContentLength, Driver,
                               ExtraOptions, FileLike, FormPost, MetaData)
from cloudstorage.exceptions import (CloudStorageError, IsNotEmptyError,
                                     NotFoundError)
from cloudstorage.helpers import file_content_type, validate_file_or_path
from cloudstorage.messages import *

logger = logging.getLogger(__name__)


[docs]class GoogleStorageDriver(Driver): """Driver for interacting with Google Cloud Storage. The driver will check for `GOOGLE_APPLICATION_CREDENTIALS` environment variable before connecting. If not found, the driver will use service worker credentials json file path passed to `key` argument. .. code-block:: python from cloudstorage.drivers.google import GoogleStorageDriver credentials_json_file = '/path/cloud-storage-service-account.json' storage = GoogleStorageDriver(key=credentials_json_file) # <Driver: GOOGLESTORAGE> .. todo: Support for container or blob encryption key. .. todo: Support for buckets with more than 256 objects on iteration. References: * `Google Cloud Storage Documentation <https://cloud.google.com/storage/docs>`_ * `Storage Client <https://googlecloudplatform.github.io/ google-cloud-python/stable/storage-client.html>`_ * `snippets.py <https://github.com/GoogleCloudPlatform/python-docs-samples/blob/ master/storage/cloud-client/snippets.py>`_ :param key: (optional) File path to service worker credentials json file. :type key: str or None :raise CloudStorageError: If `GOOGLE_APPLICATION_CREDENTIALS` environment variable is not set and/or credentials json file is not passed to the `key` argument. """ name = 'GOOGLESTORAGE' hash_type = 'md5' # TODO: QUESTION: Switch to crc32c? url = 'https://cloud.google.com/storage' def __init__(self, key: str = None) -> None: super().__init__(key=key) google_application_credentials = os.getenv(self._CREDENTIALS_ENV_NAME) # Set environment variable using credentials json file path. if not google_application_credentials: if not os.path.isfile(key): raise CloudStorageError( "Please set environment variable " "'GOOGLE_APPLICATION_CREDENTIALS' or provider file path " "to Google service account key json file.") os.environ[self._CREDENTIALS_ENV_NAME] = key self._client = storage.Client()
[docs] def __iter__(self) -> Iterable[Container]: for bucket in self.client.list_buckets(): yield self._make_container(bucket)
[docs] def __len__(self) -> int: containers = self.client.list_buckets() return len(list(containers))
@staticmethod def _normalize_parameters(params: Dict[str, str], normalizers: Dict[str, str]) -> Dict[str, str]: normalized = params.copy() for key, value in params.items(): normalized.pop(key) if not value: continue key_inflected = underscore(key).lower() # Only include parameters found in normalizers key_overrider = normalizers.get(key_inflected) if key_overrider: normalized[key_overrider] = value return normalized def _get_blob(self, bucket_name: str, blob_name: str) -> GoogleBlob: """Get a blob object by name. :param bucket_name: The name of the container that containers the blob. :type bucket_name: :param blob_name: The name of the blob to get. :type blob_name: str :return: The blob object if it exists. :rtype: :class:`google.client.storage.blob.Blob` """ bucket = self._get_bucket(bucket_name) blob = bucket.get_blob(blob_name) if not blob: raise NotFoundError(blob_not_found % (blob_name, bucket_name)) return blob def _get_bucket(self, bucket_name: str) -> Bucket: """Get a bucket by name. :param bucket_name: The name of the bucket to get. :type bucket_name: str :return: The bucket matching the name provided. :rtype: :class:`google.cloud.storage.bucket.Bucket` """ try: return self.client.get_bucket(bucket_name) except NotFound: raise NotFoundError(container_not_found % bucket_name) def _make_container(self, bucket: Bucket) -> Container: """Convert Google Storage Bucket to Cloud Storage Container. :param bucket: The bucket to convert. :type bucket: :class:`google.client.storage.bucket.Bucket` :return: A container instance. :rtype: :class:`.Container` """ acl = bucket.acl created_at = bucket.time_created.astimezone(tz=None) return Container(name=bucket.name, driver=self, acl=acl, meta_data=None, created_at=created_at) def _make_blob(self, container: Container, blob: GoogleBlob) -> Blob: """Convert Google Storage Blob to a Cloud Storage Blob. References: * `Objects <https://cloud.google.com/storage/docs/json_api/v1/objects>`_ :param container: Container instance. :type container: :class:`.Container` :param blob: Google Storage blob. :type blob: :class:`google.cloud.storage.blob.Blob` :return: Blob instance. :rtype: :class:`.Blob` """ etag_bytes = base64.b64decode(blob.etag) try: etag = etag_bytes.hex() except AttributeError: # Python 3.4: 'bytes' object has no attribute 'hex' etag = codecs.encode(etag_bytes, 'hex_codec').decode('ascii') md5_bytes = base64.b64decode(blob.md5_hash) try: md5_hash = md5_bytes.hex() except AttributeError: # Python 3.4: 'bytes' object has no attribute 'hex' md5_hash = codecs.encode(md5_bytes, 'hex_codec').decode('ascii') return Blob(name=blob.name, checksum=md5_hash, etag=etag, size=blob.size, container=container, driver=self, acl=blob.acl, meta_data=blob.metadata, content_disposition=blob.content_disposition, content_type=blob.content_type, created_at=blob.time_created, modified_at=blob.updated) @property def client(self) -> storage.client.Client: """The client bound to this driver. :return: Client for interacting with the Google Cloud Storage API. :rtype: :class:`google.cloud.storage.client.Client` """ return self._client @property def regions(self) -> List[str]: logger.warning('Regions not supported.') return []
[docs] def create_container(self, container_name: str, acl: str = None, meta_data: MetaData = None) -> Container: if meta_data: logger.warning(option_not_supported % 'meta_data') try: bucket = self.client.create_bucket(container_name) except Conflict: logger.debug(container_exists % container_name) bucket = self._get_bucket(container_name) except ValueError as e: raise CloudStorageError(str(e)) if acl: bucket.acl.save_predefined(acl) return self._make_container(bucket)
[docs] def get_container(self, container_name: str) -> Container: bucket = self._get_bucket(container_name) return self._make_container(bucket)
[docs] def patch_container(self, container: Container) -> None: raise NotImplementedError
[docs] def delete_container(self, container: Container) -> None: bucket = self._get_bucket(container.name) try: bucket.delete() except Conflict as e: if e.code == HTTPStatus.CONFLICT: raise IsNotEmptyError(container_not_empty % bucket.name) raise
[docs] def container_cdn_url(self, container: Container) -> str: return 'https://storage.googleapis.com/%s' % container.name
[docs] def enable_container_cdn(self, container: Container) -> bool: bucket = self._get_bucket(container.name) bucket.make_public(recursive=True, future=True) return True
[docs] def disable_container_cdn(self, container: Container) -> bool: bucket = self._get_bucket(container.name) bucket.acl.all().revoke_read() # opposite of make_public bucket.acl.save() return True
[docs] def upload_blob(self, container: Container, filename: Union[str, FileLike], blob_name: str = None, acl: str = None, meta_data: MetaData = None, content_type: str = None, content_disposition: str = None, extra: ExtraOptions = None) -> Blob: extra = extra if extra is not None else {} extra_args = self._normalize_parameters(extra, self._PUT_OBJECT_KEYS) extra_args.setdefault('metadata', meta_data) extra_args.setdefault('content_type', content_type) extra_args.setdefault('content_disposition', content_disposition) bucket = self._get_bucket(container.name) blob_name = blob_name or validate_file_or_path(filename) blob = bucket.blob(blob_name) # Default Content-Type is application/octet-stream for upload_from_file if not content_type: content_type = file_content_type(blob.name) if isinstance(filename, str): blob.upload_from_filename(filename=filename, content_type=content_type) else: blob.upload_from_file(file_obj=filename, content_type=content_type) if acl: blob.acl.save_predefined(acl) # Google object metadata (Content-Type set above) for attr_name, attr_value in extra_args.items(): if attr_name and hasattr(blob, attr_name): setattr(blob, attr_name, attr_value) blob.patch() return self._make_blob(container, blob)
[docs] def get_blob(self, container: Container, blob_name: str) -> Blob: blob = self._get_blob(container.name, blob_name) return self._make_blob(container, blob)
[docs] def get_blobs(self, container: Container) -> Iterable[Blob]: bucket = self._get_bucket(container.name) for blob in bucket.list_blobs(): yield self._make_blob(container, blob)
[docs] def download_blob(self, blob: Blob, destination: Union[str, FileLike]) -> None: blob = self._get_blob(blob.container.name, blob.name) if isinstance(destination, str): blob.download_to_filename(destination) else: blob.download_to_file(destination)
[docs] def patch_blob(self, blob: Blob) -> None: raise NotImplementedError
[docs] def delete_blob(self, blob: Blob) -> None: blob_ = self._get_blob(blob.container.name, blob.name) blob_.delete()
[docs] def blob_cdn_url(self, blob: Blob) -> str: return self._get_blob(blob.container.name, blob.name).public_url
[docs] def generate_container_upload_url(self, container: Container, blob_name: str, expires: int = 3600, acl: str = None, meta_data: MetaData = None, content_disposition: str = None, content_length: ContentLength = None, content_type: str = None, extra: ExtraOptions = None) -> FormPost: meta_data = meta_data if meta_data is not None else {} extra = extra if extra is not None else {} extra_norm = self._normalize_parameters(extra, self._POST_OBJECT_KEYS) bucket = self._get_bucket(container.name) conditions = [ # file name can start with any valid character. ['starts-with', '$key', ''] ] fields = {} if acl: # noinspection PyTypeChecker conditions.append({'acl': acl}) fields['acl'] = acl headers = { 'Content-Disposition': content_disposition, 'Content-Type': content_type, } for header_name, header_value in headers.items(): if not header_value: continue fields[header_name.lower()] = header_value conditions.append(['eq', '$' + header_name, header_value]) # Add content-length-range which is a tuple if content_length: min_range, max_range = content_length conditions.append(['content-length-range', min_range, max_range]) for meta_name, meta_value in meta_data.items(): meta_name = self._OBJECT_META_PREFIX + meta_name fields[meta_name] = meta_value # noinspection PyTypeChecker conditions.append({meta_name: meta_value}) # Add extra conditions and fields for extra_name, extra_value in extra_norm.items(): fields[extra_name] = extra_value # noinspection PyTypeChecker conditions.append({extra_name: extra_value}) # Determine key value for blob name when uploaded if not blob_name: # user provided filename fields['key'] = '${filename}' else: path = pathlib.Path(blob_name) if path.suffix: # blob_name is filename fields['key'] = blob_name else: # prefix + user provided filename fields['key'] = blob_name + '${filename}' logger.debug('conditions=%s' % conditions) logger.debug('fields=%s' % fields) expiration = datetime.utcnow() + timedelta(seconds=expires) # noinspection PyTypeChecker policy = bucket.generate_upload_policy(conditions=conditions, expiration=expiration) fields.update(policy) url = 'https://{bucket_name}.storage.googleapis.com'.format( bucket_name=container.name) return {'url': url, 'fields': fields}
[docs] def generate_blob_download_url(self, blob: Blob, expires: int = 3600, method: str = 'GET', content_disposition: str = None, extra: ExtraOptions = None) -> str: extra = extra if extra is not None else {} params = self._normalize_parameters(extra, self._GET_OBJECT_KEYS) expiration = timedelta(seconds=int(expires)) method_norm = method.upper() response_type = params.get('content_type', None) generation = params.get('version', None) blob = self._get_blob(blob.container.name, blob.name) return blob.generate_signed_url( expiration=expiration, method=method_norm, content_type='', generation=generation, response_disposition=content_disposition, response_type=response_type)
_CREDENTIALS_ENV_NAME = 'GOOGLE_APPLICATION_CREDENTIALS' _OBJECT_META_PREFIX = 'x-goog-meta-' #: `insert-object #: <https://cloud.google.com/storage/docs/json_api/v1/objects/insert>` #: Mapping is for blob class attribute names _PUT_OBJECT_KEYS = { 'acl': 'acl', 'bucket': 'bucket', 'cache_control': 'cache_control', 'content_disposition': 'content_disposition', 'content_encoding': 'content_encoding', 'content_length': 'content_length', 'content_type': 'content_type', 'expires': 'expires', 'meta_data': 'metadata', } #: `post-object #: <https://cloud.google.com/storage/docs/xml-api/post-object>`_ _POST_OBJECT_KEYS = { 'acl': 'acl', 'bucket': 'bucket', 'cache_control': 'Cache-Control', 'content_disposition': 'Content-Disposition', 'content_encoding': 'Content-Encoding', 'content_length': 'Content-Length', 'content_type': 'Content-Type', 'expires': 'Expires', 'key': 'Key', 'success_action_redirect': 'success_action_redirect', 'success_action_status': 'success_action_status', 'meta_data': 'Metadata', 'x_goog_meta_': 'x-goog-meta-', 'content_length_range': 'content-length-range', } #: `get-object #: <https://cloud.google.com/storage/docs/xml-api/get-object>`_ _GET_OBJECT_KEYS = { 'content_disposition': 'response_disposition', 'content_type': 'response_type', }