"""Microsoft Azure Storage Driver."""
import base64
import codecs
import logging
from datetime import datetime, timedelta
from typing import Dict, Iterable, List
from azure.common import (
AzureConflictHttpError,
AzureHttpError,
AzureMissingResourceHttpError,
)
from azure.storage.blob import BlockBlobService, PublicAccess
from azure.storage.blob.models import (
Blob as AzureBlob,
BlobPermissions,
Container as AzureContainer,
ContentSettings,
Include,
)
from inflection import underscore
from cloudstorage import Blob, Container, Driver, messages
from cloudstorage.exceptions import (
CloudStorageError,
CredentialsError,
IsNotEmptyError,
NotFoundError,
)
from cloudstorage.helpers import file_checksum, validate_file_or_path
from cloudstorage.typed import (
ContentLength,
ExtraOptions,
FileLike,
FormPost,
MetaData,
)
__all__ = ["AzureStorageDriver"]
logger = logging.getLogger(__name__)
[docs]class AzureStorageDriver(Driver):
"""Driver for interacting with Microsoft Azure Storage.
.. code-block:: python
from cloudstorage.drivers.microsoft import AzureStorageDriver
storage = AzureStorageDriver(account_name='<my-azure-account-name>',
key='<my-azure-account-key>')
# <Driver: AZURE>
.. todo: Support for container or blob encryption key.
References:
* `Blob Service REST API <https://docs.microsoft.com/en-us/rest/api/
storageservices/blob-service-rest-api>`_
* `Azure/azure-storage-python
<https://github.com/Azure/azure-storage-python>`_
* `Uploading files to Azure Storage using SAS
<https://blogs.msdn.microsoft.com/azureossds/2015/03/30/
uploading-files-to-azure-storage-using-sasshared-access-signature/>`_
:param account_name: Azure storage account name.
:type account_name: str
:param key: Azure storage account key.
:type key: str
:param kwargs: (optional) Extra driver options.
:type kwargs: dict
"""
name = "AZURE"
hash_type = "md5"
url = "https://azure.microsoft.com/en-us/services/storage/"
def __init__(self, account_name: str, key: str, **kwargs: Dict) -> None:
super().__init__(key=key, **kwargs)
self._service = BlockBlobService(
account_name=account_name, account_key=key, **kwargs
)
[docs] def __iter__(self) -> Iterable[Container]:
azure_containers = self.service.list_containers(include_metadata=True)
for azure_container in azure_containers:
yield self._convert_azure_container(azure_container)
[docs] def __len__(self) -> int:
azure_containers = self.service.list_containers()
return len(azure_containers)
@staticmethod
def _normalize_parameters(
params: Dict[str, str], normalizers: Dict[str, str]
) -> Dict[str, str]:
normalized = params.copy()
for key, value in params.items():
normalized.pop(key)
if not value:
continue
key_inflected = underscore(key).lower()
# Only include parameters found in normalizers
key_overrider = normalizers.get(key_inflected)
if key_overrider:
normalized[key_overrider] = value
return normalized
def _get_azure_blob(self, container_name: str, blob_name: str) -> AzureBlob:
"""Get Azure Storage blob by container and blob name.
:param container_name: The name of the container that containers the
blob.
:type container_name: str
:param blob_name: The name of the blob to get.
:type blob_name: str
:return: The blob object if it exists.
:rtype: :class:`azure.storage.blob.models.Blob`
"""
try:
azure_blob = self.service.get_blob_properties(container_name, blob_name)
except AzureMissingResourceHttpError as err:
logger.debug(err)
raise NotFoundError(messages.BLOB_NOT_FOUND % (blob_name, container_name))
return azure_blob
def _convert_azure_blob(self, container: Container, azure_blob: AzureBlob) -> Blob:
"""Convert Azure Storage Blob to a Cloud Storage Blob.
:param container: Container instance.
:type container: :class:`.Container`
:param azure_blob: Azure Storage blob.
:type azure_blob: :class:`azure.storage.blob.models.Blob`
:return: Blob instance.
:rtype: :class:`.Blob`
"""
content_settings = azure_blob.properties.content_settings
if content_settings.content_md5:
# TODO: CODE: Move to helper since google uses it too.
md5_bytes = base64.b64decode(content_settings.content_md5)
try:
checksum = md5_bytes.hex()
except AttributeError:
# Python 3.4: 'bytes' object has no attribute 'hex'
checksum = codecs.encode(md5_bytes, "hex_codec").decode("ascii")
else:
logger.warning("Content MD5 not populated, content will not be validated")
checksum = None
return Blob(
name=azure_blob.name,
size=azure_blob.properties.content_length,
checksum=checksum,
etag=azure_blob.properties.etag,
container=container,
driver=self,
acl=None,
meta_data=azure_blob.metadata,
content_disposition=content_settings.content_disposition,
content_type=content_settings.content_type,
cache_control=content_settings.cache_control,
created_at=None,
modified_at=azure_blob.properties.last_modified,
expires_at=None,
)
def _get_azure_container(self, container_name: str) -> AzureContainer:
"""Get Azure Storage container by name.
:param container_name: The name of the container to get.
:type container_name: str
:return: The container matching the name provided.
:rtype: :class:`azure.storage.blob.models.Container`
"""
try:
azure_container = self.service.get_container_properties(container_name)
except AzureMissingResourceHttpError as err:
logger.debug(err)
raise NotFoundError(messages.CONTAINER_NOT_FOUND % container_name)
return azure_container
def _convert_azure_container(self, azure_container: AzureContainer) -> Container:
"""Convert Azure Storage container to Cloud Storage Container.
:param azure_container: The container to convert.
:type azure_container: :class:`azure.storage.blob.models.Container`
:return: A container instance.
:rtype: :class:`.Container`
"""
return Container(
name=azure_container.name,
driver=self,
acl=azure_container.properties.public_access,
meta_data=azure_container.metadata,
created_at=azure_container.properties.last_modified,
)
@property
def service(self) -> BlockBlobService:
"""The block blob service bound to this driver.
:return: Service for interacting with the Microsoft Azure Storage API.
:rtype: :class:`azure.storage.blob.blockblobservice.BlockBlobService`
"""
return self._service
[docs] def validate_credentials(self) -> None:
try:
for _ in self.service.list_containers():
break
except AzureHttpError as err:
raise CredentialsError(str(err))
@property
def regions(self) -> List[str]:
logger.warning("Regions not supported.")
return []
[docs] def create_container(
self, container_name: str, acl: str = None, meta_data: MetaData = None
) -> Container:
meta_data = meta_data if meta_data is not None else {}
# Review options: Off, Blob, Container
if acl == "container-public-access":
public_access = PublicAccess.Container
elif acl == "blob-public-access":
public_access = PublicAccess.Blob
else:
public_access = None
try:
self.service.create_container(
container_name,
metadata=meta_data,
public_access=public_access,
fail_on_exist=False,
)
except AzureConflictHttpError:
logger.debug(messages.CONTAINER_EXISTS, container_name)
except AzureHttpError as err:
logger.debug(err)
raise CloudStorageError(str(err))
azure_container = self._get_azure_container(container_name)
return self._convert_azure_container(azure_container)
[docs] def get_container(self, container_name: str) -> Container:
azure_container = self._get_azure_container(container_name)
return self._convert_azure_container(azure_container)
[docs] def patch_container(self, container: Container) -> None:
raise NotImplementedError
[docs] def delete_container(self, container: Container) -> None:
azure_container = self._get_azure_container(container.name)
azure_blobs = self.service.list_blobs(azure_container.name, num_results=1)
if len(azure_blobs.items) > 0:
raise IsNotEmptyError(messages.CONTAINER_NOT_EMPTY % azure_container.name)
self.service.delete_container(azure_container.name, fail_not_exist=False)
[docs] def container_cdn_url(self, container: Container) -> str:
azure_container = self._get_azure_container(container.name)
url = "{}://{}/{}".format(
self.service.protocol,
self.service.primary_endpoint,
azure_container.name,
)
return url
[docs] def enable_container_cdn(self, container: Container) -> bool:
logger.warning(messages.FEATURE_NOT_SUPPORTED, "enable_container_cdn")
return False
[docs] def disable_container_cdn(self, container: Container) -> bool:
logger.warning(messages.FEATURE_NOT_SUPPORTED, "disable_container_cdn")
return False
[docs] def upload_blob(
self,
container: Container,
filename: FileLike,
blob_name: str = None,
acl: str = None,
meta_data: MetaData = None,
content_type: str = None,
content_disposition: str = None,
cache_control: str = None,
chunk_size: int = 1024,
extra: ExtraOptions = None,
) -> Blob:
if acl:
logger.info(messages.OPTION_NOT_SUPPORTED, "acl")
meta_data = {} if meta_data is None else meta_data
extra = extra if extra is not None else {}
extra_args = self._normalize_parameters(extra, self._PUT_OBJECT_KEYS)
extra_args.setdefault("content_type", content_type)
extra_args.setdefault("content_disposition", content_disposition)
extra_args.setdefault("cache_control", cache_control)
azure_container = self._get_azure_container(container.name)
blob_name = blob_name or validate_file_or_path(filename)
# azure does not set content_md5 on backend
file_hash = file_checksum(filename, hash_type=self.hash_type)
file_digest = file_hash.digest()
checksum = base64.b64encode(file_digest).decode("utf-8").strip()
extra_args.setdefault("content_md5", checksum)
content_settings = ContentSettings(**extra_args)
if isinstance(filename, str):
self.service.create_blob_from_path(
container_name=azure_container.name,
blob_name=blob_name,
file_path=filename,
content_settings=content_settings,
metadata=meta_data,
validate_content=True,
)
else:
self.service.create_blob_from_stream(
container_name=azure_container.name,
blob_name=blob_name,
stream=filename,
content_settings=content_settings,
metadata=meta_data,
validate_content=True,
)
azure_blob = self._get_azure_blob(azure_container.name, blob_name)
return self._convert_azure_blob(container, azure_blob)
[docs] def get_blob(self, container: Container, blob_name: str) -> Blob:
azure_container = self._get_azure_container(container.name)
azure_blob = self._get_azure_blob(azure_container.name, blob_name)
return self._convert_azure_blob(container, azure_blob)
[docs] def get_blobs(self, container: Container) -> Iterable[Blob]:
azure_container = self._get_azure_container(container.name)
azure_blobs = self.service.list_blobs(
azure_container.name, include=Include(metadata=True)
)
for azure_blob in azure_blobs:
yield self._convert_azure_blob(container, azure_blob)
[docs] def download_blob(self, blob: Blob, destination: FileLike) -> None:
azure_blob = self._get_azure_blob(blob.container.name, blob.name)
if isinstance(destination, str):
self.service.get_blob_to_path(
container_name=blob.container.name,
blob_name=azure_blob.name,
file_path=destination,
)
else:
self.service.get_blob_to_stream(
container_name=blob.container.name,
blob_name=azure_blob.name,
stream=destination,
)
[docs] def patch_blob(self, blob: Blob) -> None:
pass
[docs] def delete_blob(self, blob: Blob) -> None:
azure_blob = self._get_azure_blob(blob.container.name, blob.name)
self.service.delete_blob(blob.container.name, azure_blob.name)
[docs] def blob_cdn_url(self, blob: Blob) -> str:
azure_blob = self._get_azure_blob(blob.container.name, blob.name)
url = self.service.make_blob_url(blob.container.name, azure_blob.name)
return url
[docs] def generate_container_upload_url(
self,
container: Container,
blob_name: str,
expires: int = 3600,
acl: str = None,
meta_data: MetaData = None,
content_disposition: str = None,
content_length: ContentLength = None,
content_type: str = None,
cache_control: str = None,
extra: ExtraOptions = None,
) -> FormPost:
if acl:
logger.info(messages.OPTION_NOT_SUPPORTED, "acl")
meta_data = meta_data if meta_data is not None else {}
extra = extra if extra is not None else {}
params = self._normalize_parameters(extra, self._POST_OBJECT_KEYS)
azure_container = self._get_azure_container(container.name)
expires_at = datetime.utcnow() + timedelta(seconds=expires)
sas_token = self.service.generate_container_shared_access_signature(
container_name=azure_container.name,
permission=BlobPermissions.WRITE,
expiry=expires_at,
content_disposition=content_disposition,
content_type=content_type,
**params,
)
headers = {
"x-ms-blob-type": "BlockBlob",
"x-ms-blob-content-type": content_type,
"x-ms-blob-content-disposition": content_disposition,
"x-ms-blob-cache-control": cache_control,
}
for meta_key, meta_value in meta_data.items():
key = self._OBJECT_META_PREFIX + meta_key
headers[key] = meta_value
upload_url = self.service.make_blob_url(
container_name=azure_container.name,
blob_name=blob_name,
sas_token=sas_token,
)
return {"url": upload_url, "fields": None, "headers": headers}
[docs] def generate_blob_download_url(
self,
blob: Blob,
expires: int = 3600,
method: str = "GET",
content_disposition: str = None,
extra: ExtraOptions = None,
) -> str:
extra = extra if extra is not None else {}
params = self._normalize_parameters(extra, self._GET_OBJECT_KEYS)
azure_blob = self._get_azure_blob(blob.container.name, blob.name)
content_type = params.get("content_type", None)
expires_at = datetime.utcnow() + timedelta(seconds=expires)
sas_token = self.service.generate_blob_shared_access_signature(
container_name=blob.container.name,
blob_name=azure_blob.name,
permission=BlobPermissions.READ,
expiry=expires_at,
content_disposition=content_disposition,
content_type=content_type,
**params,
)
download_url = self.service.make_blob_url(
container_name=blob.container.name,
blob_name=azure_blob.name,
sas_token=sas_token,
)
return download_url
_OBJECT_META_PREFIX = "x-ms-meta-"
#: `insert-object
#: <https://docs.microsoft.com/en-us/rest/api/storageservices/
# set-blob-properties>`
_PUT_OBJECT_KEYS = {} # type: Dict
#: `post-object
#: <https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob>`_
_POST_OBJECT_KEYS = {} # type: Dict
#: `get-object
#: <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>`_
_GET_OBJECT_KEYS = {} # type: Dict