Source code for cloudstorage.drivers.microsoft

"""Microsoft Azure Storage Driver."""
import logging
from datetime import datetime, timedelta

import base64
import codecs

try:
    from http import HTTPStatus
except ImportError:
    # noinspection PyUnresolvedReferences
    from httpstatus import HTTPStatus

from typing import Dict, Iterable, List, Union

from azure.common import AzureMissingResourceHttpError
from azure.common import AzureHttpError
from azure.common import AzureConflictHttpError
from azure.storage.blob import PublicAccess
from azure.storage.blob import BlockBlobService
from azure.storage.blob.models import Blob as AzureBlob
from azure.storage.blob.models import Container as AzureContainer
from azure.storage.blob.models import Include
from azure.storage.blob.models import ContentSettings
from azure.storage.blob.models import BlobPermissions
from inflection import underscore

from cloudstorage.base import Blob
from cloudstorage.base import Container
from cloudstorage.base import ContentLength
from cloudstorage.base import Driver
from cloudstorage.base import ExtraOptions
from cloudstorage.base import FileLike
from cloudstorage.base import FormPost
from cloudstorage.base import MetaData
from cloudstorage.exceptions import NotFoundError
from cloudstorage.exceptions import CloudStorageError
from cloudstorage.exceptions import IsNotEmptyError
from cloudstorage.helpers import file_checksum, validate_file_or_path
from cloudstorage.messages import CONTAINER_NOT_FOUND
from cloudstorage.messages import CONTAINER_EXISTS
from cloudstorage.messages import CONTAINER_NOT_EMPTY
from cloudstorage.messages import BLOB_NOT_FOUND
from cloudstorage.messages import FEATURE_NOT_SUPPORTED
from cloudstorage.messages import OPTION_NOT_SUPPORTED

logger = logging.getLogger(__name__)


[docs]class AzureStorageDriver(Driver): """Driver for interacting with Microsoft Azure Storage. .. code-block:: python from cloudstorage.drivers.microsoft import AzureStorageDriver storage = AzureStorageDriver(account_name='<my-azure-account-name>', key='<my-azure-account-key>') # <Driver: AZURE> .. todo: Support for container or blob encryption key. References: * `Blob Service REST API <https://docs.microsoft.com/en-us/rest/api/ storageservices/blob-service-rest-api>`_ * `Azure/azure-storage-python <https://github.com/Azure/azure-storage-python>`_ * `Uploading files to Azure Storage using SAS <https://blogs.msdn.microsoft.com/azureossds/2015/03/30/ uploading-files-to-azure-storage-using-sasshared-access-signature/>`_ :param account_name: Azure storage account name. :type account_name: str :param key: Azure storage account key. :type key: str :param kwargs: (optional) Catch invalid options. :type kwargs: dict """ name = 'AZURE' hash_type = 'md5' url = 'https://azure.microsoft.com/en-us/services/storage/' def __init__(self, account_name: str = None, key: str = None, **kwargs: Dict) -> None: super().__init__(key=key) self._service = BlockBlobService(account_name=account_name, account_key=key, **kwargs)
[docs] def __iter__(self) -> Iterable[Container]: azure_containers = self.service.list_containers(include_metadata=True) for azure_container in azure_containers: yield self._convert_azure_container(azure_container)
[docs] def __len__(self) -> int: azure_containers = self.service.list_containers() return len(azure_containers)
@staticmethod def _normalize_parameters(params: Dict[str, str], normalizers: Dict[str, str]) -> Dict[str, str]: normalized = params.copy() for key, value in params.items(): normalized.pop(key) if not value: continue key_inflected = underscore(key).lower() # Only include parameters found in normalizers key_overrider = normalizers.get(key_inflected) if key_overrider: normalized[key_overrider] = value return normalized def _get_azure_blob(self, container_name: str, blob_name: str) -> AzureBlob: """Get Azure Storage blob by container and blob name. :param container_name: The name of the container that containers the blob. :type container_name: str :param blob_name: The name of the blob to get. :type blob_name: str :return: The blob object if it exists. :rtype: :class:`azure.storage.blob.models.Blob` """ try: azure_blob = self.service.get_blob_properties(container_name, blob_name) except AzureMissingResourceHttpError as err: logger.debug(err) raise NotFoundError(BLOB_NOT_FOUND % (blob_name, container_name)) return azure_blob def _convert_azure_blob(self, container: Container, azure_blob: AzureBlob) -> Blob: """Convert Azure Storage Blob to a Cloud Storage Blob. :param container: Container instance. :type container: :class:`.Container` :param azure_blob: Azure Storage blob. :type azure_blob: :class:`azure.storage.blob.models.Blob` :return: Blob instance. :rtype: :class:`.Blob` """ content_settings = azure_blob.properties.content_settings # TODO: CODE: Move to helper since google uses it too. md5_bytes = base64.b64decode(content_settings.content_md5) try: checksum = md5_bytes.hex() except AttributeError: # Python 3.4: 'bytes' object has no attribute 'hex' checksum = codecs.encode(md5_bytes, 'hex_codec').decode('ascii') return Blob(name=azure_blob.name, size=azure_blob.properties.content_length, checksum=checksum, etag=azure_blob.properties.etag, container=container, driver=self, acl=None, meta_data=azure_blob.metadata, content_disposition=content_settings.content_disposition, content_type=content_settings.content_type, created_at=None, modified_at=azure_blob.properties.last_modified, expires_at=None) def _get_azure_container(self, container_name: str) -> AzureContainer: """Get Azure Storage container by name. :param container_name: The name of the container to get. :type container_name: str :return: The container matching the name provided. :rtype: :class:`azure.storage.blob.models.Container` """ try: azure_container = self.service.get_container_properties( container_name) except AzureMissingResourceHttpError as err: logger.debug(err) raise NotFoundError(CONTAINER_NOT_FOUND % container_name) return azure_container def _convert_azure_container(self, azure_container: AzureContainer) -> Container: """Convert Azure Storage container to Cloud Storage Container. :param azure_container: The container to convert. :type azure_container: :class:`azure.storage.blob.models.Container` :return: A container instance. :rtype: :class:`.Container` """ return Container(name=azure_container.name, driver=self, acl=azure_container.properties.public_access, meta_data=azure_container.metadata, created_at=azure_container.properties.last_modified) @property def service(self) -> BlockBlobService: """The block blob service bound to this driver. :return: Service for interacting with the Microsoft Azure Storage API. :rtype: :class:`azure.storage.blob.blockblobservice.BlockBlobService` """ return self._service @property def regions(self) -> List[str]: logger.warning('Regions not supported.') return []
[docs] def create_container(self, container_name: str, acl: str = None, meta_data: MetaData = None) -> Container: meta_data = meta_data if meta_data is not None else {} # Review options: Off, Blob, Container if acl == 'container-public-access': public_access = PublicAccess.Container elif acl == 'blob-public-access': public_access = PublicAccess.Blob else: public_access = None try: self.service.create_container(container_name, metadata=meta_data, public_access=public_access, fail_on_exist=False) except AzureConflictHttpError: logger.debug(CONTAINER_EXISTS, container_name) except AzureHttpError as err: logger.debug(err) raise CloudStorageError(str(err)) azure_container = self._get_azure_container(container_name) return self._convert_azure_container(azure_container)
[docs] def get_container(self, container_name: str) -> Container: azure_container = self._get_azure_container(container_name) return self._convert_azure_container(azure_container)
[docs] def patch_container(self, container: Container) -> None: raise NotImplementedError
[docs] def delete_container(self, container: Container) -> None: azure_container = self._get_azure_container(container.name) azure_blobs = self.service.list_blobs(azure_container.name, num_results=1) if len(azure_blobs.items) > 0: raise IsNotEmptyError(CONTAINER_NOT_EMPTY % azure_container.name) self.service.delete_container(azure_container.name, fail_not_exist=False)
[docs] def container_cdn_url(self, container: Container) -> str: azure_container = self._get_azure_container(container.name) url = '{}://{}/{}'.format( self.service.protocol, self.service.primary_endpoint, azure_container.name, ) return url
[docs] def enable_container_cdn(self, container: Container) -> bool: logger.warning(FEATURE_NOT_SUPPORTED, 'enable_container_cdn') return False
[docs] def disable_container_cdn(self, container: Container) -> bool: logger.warning(FEATURE_NOT_SUPPORTED, 'disable_container_cdn') return False
[docs] def upload_blob(self, container: Container, filename: Union[str, FileLike], blob_name: str = None, acl: str = None, meta_data: MetaData = None, content_type: str = None, content_disposition: str = None, chunk_size: int = 1024, extra: ExtraOptions = None) -> Blob: if acl: logger.info(OPTION_NOT_SUPPORTED, 'acl') meta_data = {} if meta_data is None else meta_data extra = extra if extra is not None else {} extra_args = self._normalize_parameters(extra, self._PUT_OBJECT_KEYS) extra_args.setdefault('content_type', content_type) extra_args.setdefault('content_disposition', content_disposition) azure_container = self._get_azure_container(container.name) blob_name = blob_name or validate_file_or_path(filename) # azure does not set content_md5 on backend file_hash = file_checksum(filename, hash_type=self.hash_type) file_digest = file_hash.digest() checksum = base64.b64encode(file_digest).decode('utf-8').strip() extra_args.setdefault('content_md5', checksum) content_settings = ContentSettings(**extra_args) if isinstance(filename, str): self.service.create_blob_from_path( container_name=azure_container.name, blob_name=blob_name, file_path=filename, content_settings=content_settings, metadata=meta_data, validate_content=True, ) else: self.service.create_blob_from_stream( container_name=azure_container.name, blob_name=blob_name, stream=filename, content_settings=content_settings, metadata=meta_data, validate_content=True, ) azure_blob = self._get_azure_blob(azure_container.name, blob_name) return self._convert_azure_blob(container, azure_blob)
[docs] def get_blob(self, container: Container, blob_name: str) -> Blob: azure_container = self._get_azure_container(container.name) azure_blob = self._get_azure_blob(azure_container.name, blob_name) return self._convert_azure_blob(container, azure_blob)
[docs] def get_blobs(self, container: Container) -> Iterable[Blob]: azure_container = self._get_azure_container(container.name) azure_blobs = self.service.list_blobs(azure_container.name, include=Include(metadata=True)) for azure_blob in azure_blobs: yield self._convert_azure_blob(container, azure_blob)
[docs] def download_blob(self, blob: Blob, destination: Union[str, FileLike]) -> None: azure_blob = self._get_azure_blob(blob.container.name, blob.name) if isinstance(destination, str): self.service.get_blob_to_path( container_name=blob.container.name, blob_name=azure_blob.name, file_path=destination, ) else: self.service.get_blob_to_stream( container_name=blob.container.name, blob_name=azure_blob.name, stream=destination, )
[docs] def patch_blob(self, blob: Blob) -> None: pass
[docs] def delete_blob(self, blob: Blob) -> None: azure_blob = self._get_azure_blob(blob.container.name, blob.name) self.service.delete_blob(blob.container.name, azure_blob.name)
[docs] def blob_cdn_url(self, blob: Blob) -> str: azure_blob = self._get_azure_blob(blob.container.name, blob.name) url = self.service.make_blob_url(blob.container.name, azure_blob.name) return url
[docs] def generate_container_upload_url(self, container: Container, blob_name: str, expires: int = 3600, acl: str = None, meta_data: MetaData = None, content_disposition: str = None, content_length: ContentLength = None, content_type: str = None, extra: ExtraOptions = None) -> FormPost: if acl: logger.info(OPTION_NOT_SUPPORTED, 'acl') meta_data = meta_data if meta_data is not None else {} extra = extra if extra is not None else {} params = self._normalize_parameters(extra, self._POST_OBJECT_KEYS) azure_container = self._get_azure_container(container.name) expires_at = datetime.utcnow() + timedelta(seconds=expires) sas_token = self.service.generate_container_shared_access_signature( container_name=azure_container.name, permission=BlobPermissions.WRITE, expiry=expires_at, content_disposition=content_disposition, content_type=content_type, **params ) headers = { 'x-ms-blob-type': 'BlockBlob', 'x-ms-blob-content-type': content_type, 'x-ms-blob-content-disposition': content_disposition } for meta_key, meta_value in meta_data.items(): key = self._OBJECT_META_PREFIX + meta_key headers[key] = meta_value upload_url = self.service.make_blob_url( container_name=azure_container.name, blob_name=blob_name, sas_token=sas_token, ) return {'url': upload_url, 'fields': None, 'headers': headers}
[docs] def generate_blob_download_url(self, blob: Blob, expires: int = 3600, method: str = 'GET', content_disposition: str = None, extra: ExtraOptions = None) -> str: extra = extra if extra is not None else {} params = self._normalize_parameters(extra, self._GET_OBJECT_KEYS) azure_blob = self._get_azure_blob(blob.container.name, blob.name) content_type = params.get('content_type', None) expires_at = datetime.utcnow() + timedelta(seconds=expires) sas_token = self.service.generate_blob_shared_access_signature( container_name=blob.container.name, blob_name=azure_blob.name, permission=BlobPermissions.READ, expiry=expires_at, content_disposition=content_disposition, content_type=content_type, **params ) download_url = self.service.make_blob_url( container_name=blob.container.name, blob_name=azure_blob.name, sas_token=sas_token, ) return download_url
_OBJECT_META_PREFIX = 'x-ms-meta-' #: `insert-object #: <https://docs.microsoft.com/en-us/rest/api/storageservices/ # set-blob-properties>` _PUT_OBJECT_KEYS = { } # type: Dict #: `post-object #: <https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob>`_ _POST_OBJECT_KEYS = { } # type: Dict #: `get-object #: <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>`_ _GET_OBJECT_KEYS = { } # type: Dict