"""Minio Driver."""
import logging
import re
from datetime import datetime, timedelta
from typing import Dict, Iterable, List # noqa: F401
from urllib.parse import quote, urljoin
from inflection import camelize, underscore
from minio import Minio, PostPolicy, definitions
from minio.error import (
BucketAlreadyExists,
BucketAlreadyOwnedByYou,
BucketNotEmpty,
InvalidAccessKeyId,
InvalidBucketError,
InvalidBucketName,
NoSuchKey,
ResponseError,
SignatureDoesNotMatch,
)
from cloudstorage import Blob, Container, Driver, messages
from cloudstorage.exceptions import (
CloudStorageError,
CredentialsError,
IsNotEmptyError,
NotFoundError,
)
from cloudstorage.helpers import (
file_content_type,
validate_file_or_path,
)
from cloudstorage.typed import (
ContentLength,
ExtraOptions,
FileLike,
FormPost,
MetaData,
)
__all__ = ["MinioDriver"]
logger = logging.getLogger(__name__)
_REGIONS = [
"us-east-1",
"us-west-1",
"us-west-2",
"eu-west-1",
"eu-central-1",
"ap-southeast-1",
"ap-southeast-2",
"ap-northeast-1",
"ap-northeast-2",
"sa-east-1",
"cn-north-1",
]
[docs]class MinioDriver(Driver):
"""Driver for interacting with any Minio compatible object storage
server.
.. code-block:: python
from cloudstorage.drivers.minio import MinioDriver
storage = MinioDriver(endpoint='minioserver:port',
key='<my-access-key-id>',
secret='<my-secret-access-key>',
region='us-east-1',
secure=True)
# <Driver: Minio us-east-1>
References:
* `Python Client API Reference
<https://docs.minio.io/docs/python-client-api-reference.html>`_
* `Minio Python Library <https://github.com/minio/minio-py>`_
:param endpoint: Minio server to connect to.
:type endpoint: str
:param key: Minio access key.
:type key: str
:param secret: Minio secret key.
:type secret: str
:param region: (optional) Region to connect to. Defaults to `us-east-1`.
:type region: str
:param kwargs: (optional) Extra driver options.
* secure (`bool`): Use secure connection.
* http_client (:class:`urllib3.poolmanager.PoolManager`):
Use custom http client.
:type kwargs: dict
"""
name = "MINIO"
url = "https://www.minio.io"
def __init__(
self,
endpoint: str,
key: str,
secret: str = None,
region: str = "us-east-1",
**kwargs: Dict,
) -> None:
secure = kwargs.pop("secure", True)
http_client = kwargs.pop("http_client", None)
self._client = Minio(
endpoint,
access_key=key,
secret_key=secret,
secure=secure,
region=region,
http_client=http_client,
)
super().__init__(key=key, secret=secret, region=region, **kwargs)
[docs] def __iter__(self) -> Iterable[Container]:
for bucket in self.client.list_buckets():
yield self._make_container(bucket)
[docs] def __len__(self) -> int:
buckets = [bucket for bucket in self.client.list_buckets()]
return len(buckets)
@staticmethod
def _normalize_parameters(
params: Dict[str, str], normalizers: Dict[str, str]
) -> Dict[str, str]:
normalized = params.copy()
for key, value in params.items():
normalized.pop(key)
if not value:
continue
key_inflected = camelize(underscore(key), uppercase_first_letter=True)
# Only include parameters found in normalizers
key_overrider = normalizers.get(key_inflected.lower())
if key_overrider:
normalized[key_overrider] = value
return normalized
def _get_bucket(self, bucket_name: str) -> definitions.Bucket:
"""Get a Minio bucket.
:param bucket_name: The Bucket's name identifier.
:type bucket_name: str
:return: Bucket resource object.
:rtype: :class:`minio.definitions.Bucket`
:raises NotFoundError: If the bucket does not exist.
"""
for bucket in self.client.list_buckets():
if bucket.name == bucket_name:
return bucket
raise NotFoundError(messages.CONTAINER_NOT_FOUND % bucket_name)
def _make_obj(self, container: Container, obj: definitions.Object) -> Blob:
"""Convert Minio Object to Blob instance.
:param container: The container that holds the blob.
:type container: :class:`.Container`
:param obj: Minio object stats.
:type obj: :class:`minio.definitions.Object`
:return: A blob object.
:rtype: :class:`.Blob`
"""
obj_metadata = {} if obj.metadata is None else obj.metadata
meta_data = {}
for name, value in obj_metadata.items():
meta_key = re.sub(
r"\b%s\b" % re.escape(self._OBJECT_META_PREFIX),
"",
name,
flags=re.IGNORECASE,
)
if meta_key != name: # Content-Type key is in the obj meta data
meta_data[meta_key] = value
return Blob(
name=obj.object_name,
checksum="",
etag=obj.etag,
size=obj.size,
container=container,
driver=self,
acl={},
meta_data=meta_data,
content_disposition=None,
content_type=obj.content_type,
cache_control=None,
created_at=None,
modified_at=obj.last_modified,
expires_at=None,
)
def _make_container(self, bucket: definitions.Bucket) -> Container:
"""Convert Minio Bucket to Container.
:param bucket: Minio bucket.
:type bucket: :class:`minio.definitions.Bucket`
:return: The container if it exists.
:rtype: :class:`.Container`
"""
created_at = bucket.creation_date.astimezone(tz=None)
return Container(
name=bucket.name, driver=self, acl="", meta_data=None, created_at=created_at
)
@property
def client(self) -> Minio:
"""Minio client session.
:return: Minio client session.
:rtype: :class:`minio.Minio`
"""
return self._client
[docs] def validate_credentials(self) -> None:
try:
for _ in self.client.list_buckets():
break
except (InvalidAccessKeyId, SignatureDoesNotMatch) as err:
raise CredentialsError(str(err))
@property
def regions(self) -> List[str]:
return _REGIONS
[docs] def create_container(
self, container_name: str, acl: str = None, meta_data: MetaData = None
) -> Container:
if meta_data:
logger.info(messages.OPTION_NOT_SUPPORTED, "meta_data")
if acl:
logger.info(messages.OPTION_NOT_SUPPORTED, "acl")
try:
self.client.make_bucket(container_name)
except (BucketAlreadyExists, BucketAlreadyOwnedByYou):
pass
except (InvalidBucketName, InvalidBucketError, ResponseError) as err:
raise CloudStorageError(str(err))
bucket = self._get_bucket(container_name)
return self._make_container(bucket)
[docs] def get_container(self, container_name: str) -> Container:
bucket = self._get_bucket(container_name)
return self._make_container(bucket)
[docs] def patch_container(self, container: Container) -> None:
raise NotImplementedError
[docs] def delete_container(self, container: Container) -> None:
bucket = self._get_bucket(container.name)
try:
self.client.remove_bucket(container.name)
except BucketNotEmpty:
raise IsNotEmptyError(messages.CONTAINER_NOT_EMPTY % bucket.name)
[docs] def container_cdn_url(self, container: Container) -> str:
return "%s/%s" % (self.client._endpoint_url, container.name)
[docs] def enable_container_cdn(self, container: Container) -> bool:
logger.warning(messages.FEATURE_NOT_SUPPORTED, "enable_container_cdn")
return False
[docs] def disable_container_cdn(self, container: Container) -> bool:
logger.warning(messages.FEATURE_NOT_SUPPORTED, "disable_container_cdn")
return False
[docs] def upload_blob(
self,
container: Container,
filename: FileLike,
blob_name: str = None,
acl: str = None,
meta_data: MetaData = None,
content_type: str = None,
content_disposition: str = None,
cache_control: str = None,
chunk_size: int = 1024,
extra: ExtraOptions = None,
) -> Blob:
meta_data = {} if meta_data is None else meta_data
extra = {} if extra is None else extra
blob_name = blob_name or validate_file_or_path(filename)
if not content_type:
if isinstance(filename, str):
content_type = file_content_type(filename)
else:
content_type = file_content_type(blob_name)
if isinstance(filename, str):
self.client.fput_object(
container.name,
blob_name,
filename,
content_type=content_type,
metadata=meta_data,
)
else:
length = extra.pop("length", len(filename.read()))
filename.seek(0)
self.client.put_object(
container.name,
blob_name,
filename,
length,
content_type=content_type,
metadata=meta_data,
)
return self.get_blob(container, blob_name)
[docs] def get_blob(self, container: Container, blob_name: str) -> Blob:
try:
obj = self.client.stat_object(container.name, blob_name)
except NoSuchKey:
raise NotFoundError(messages.BLOB_NOT_FOUND % (blob_name, container.name))
return self._make_obj(container, obj)
[docs] def get_blobs(self, container: Container) -> Iterable[Blob]:
for obj in self.client.list_objects(container.name, recursive=False):
yield self._make_obj(container, obj)
[docs] def download_blob(self, blob: Blob, destination: FileLike) -> None:
data = self.client.get_object(blob.container.name, blob.name)
if isinstance(destination, str):
with open(destination, "wb") as blob_data:
for d in data.stream(4096):
blob_data.write(d)
else:
for d in data.stream(4096):
destination.write(d)
[docs] def patch_blob(self, blob: Blob) -> None:
raise NotImplementedError
[docs] def delete_blob(self, blob: Blob) -> None:
try:
self.client.remove_object(blob.container.name, blob.name)
except ResponseError as err:
raise CloudStorageError(str(err))
[docs] def blob_cdn_url(self, blob: Blob) -> str:
container_url = self.container_cdn_url(blob.container)
blob_name_cleaned = quote(blob.name)
blob_path = "%s/%s" % (container_url, blob_name_cleaned)
url = urljoin(container_url, blob_path)
return url
[docs] def generate_container_upload_url(
self,
container: Container,
blob_name: str,
expires: int = 3600,
acl: str = None,
meta_data: MetaData = None,
content_disposition: str = None,
content_length: ContentLength = None,
content_type: str = None,
cache_control: str = None,
extra: ExtraOptions = None,
) -> FormPost:
if content_disposition:
logger.warning(messages.OPTION_NOT_SUPPORTED, "content_disposition")
if cache_control:
logger.warning(messages.OPTION_NOT_SUPPORTED, "cache_control")
meta_data = {} if meta_data is None else meta_data
post_policy = PostPolicy()
post_policy.set_bucket_name(container.name)
post_policy.set_key_startswith(blob_name)
if content_length:
min_range, max_range = content_length
post_policy.set_content_length_range(min_range, max_range)
if content_type:
post_policy.set_content_type(content_type)
for meta_name, meta_value in meta_data.items():
meta_name = self._OBJECT_META_PREFIX + meta_name
post_policy.policies.append(("eq", "$%s" % meta_name, meta_value))
post_policy.form_data[meta_name] = meta_value
expires_date = datetime.utcnow() + timedelta(seconds=expires)
post_policy.set_expires(expires_date)
url, fields = self.client.presigned_post_policy(post_policy)
return {"url": url, "fields": fields}
[docs] def generate_blob_download_url(
self,
blob: Blob,
expires: int = 3600,
method: str = "GET",
content_disposition: str = None,
extra: ExtraOptions = None,
) -> str:
extra = {} if extra is None else extra
response_headers = self._normalize_parameters(extra, self._GET_OBJECT_KEYS)
if content_disposition:
response_headers.setdefault(
"response-content-disposition", content_disposition
)
expires = timedelta(seconds=int(expires))
url = self.client.presigned_get_object(
blob.container.name, blob.name, expires, response_headers
)
return url
_OBJECT_META_PREFIX = "X-Amz-Meta-" # type: str
#: `S3.Client.generate_presigned_post
#: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html
#: #S3.Client.generate_presigned_post>`_
_POST_OBJECT_KEYS = {
"acl": "acl",
"cachecontrol": "Cache-Control",
"contenttype": "Content-Type",
"contentdisposition": "Content-Disposition",
"contentencoding": "Content-Encoding",
"expires": "Expires",
"successactionredirect": "success_action_redirect",
"redirect": "redirect",
"successactionstatus": "success_action_status",
"xamzmeta": "x-amz-meta-",
}
#: `#S3.Client.get_object
#: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html
#: #S3.Client.get_object>`_
_GET_OBJECT_KEYS = {
"bucket": "Bucket",
"ifmatch": "IfMatch",
"ifmodifiedsince": "IfModifiedSince",
"ifnonematch": "IfNoneMatch",
"ifunmodifiedsince": "IfUnmodifiedSince",
"key": "Key",
"range": "Range",
"responsecachecontrol": "ResponseCacheControl",
"responsecontentdisposition": "ResponseContentDisposition",
"responsecontentencoding": "ResponseContentEncoding",
"responsecontentlanguage": "ResponseContentLanguage",
"responsecontenttype": "ResponseContentType",
"responseexpires": "ResponseExpires",
"versionid": "VersionId",
"ssecustomeralgorithm": "SSECustomerAlgorithm",
"ssecustomerkey": "SSECustomerKey",
"requestpayer": "RequestPayer",
"partnumber": "PartNumber",
# Extra keys to standardize across all drivers
"cachecontrol": "ResponseCacheControl",
"contentdisposition": "ResponseContentDisposition",
"contentencoding": "ResponseContentEncoding",
"contentlanguage": "ResponseContentLanguage",
"contenttype": "ResponseContentType",
"expires": "ResponseExpires",
}
#: `S3.Client.put_object
#: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html
#: #S3.Client.put_object>`_
_PUT_OBJECT_KEYS = {
"acl": "ACL",
"body": "Body",
"bucket": "Bucket",
"cachecontrol": "CacheControl",
"contentdisposition": "ContentDisposition",
"contentencoding": "ContentEncoding",
"contentlanguage": "ContentLanguage",
"contentlength": "ContentLength",
"contentmd5": "ContentMD5",
"contenttype": "ContentType",
"expires": "Expires",
"grantfullcontrol": "GrantFullControl",
"grantread": "GrantRead",
"grantreadacp": "GrantReadACP",
"grantwriteacp": "GrantWriteACP",
"key": "Key",
"metadata": "Metadata",
"serversideencryption": "ServerSideEncryption",
"storageclass": "StorageClass",
"websiteredirectlocation": "WebsiteRedirectLocation",
"ssecustomeralgorithm": "SSECustomerAlgorithm",
"ssecustomerkey": "SSECustomerKey",
"ssekmskeyid": "SSEKMSKeyId",
"requestpayer": "RequestPayer",
"tagging": "Tagging",
}
#: `S3.Client.delete_object
#: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html
#: #S3.Client.delete_object>`_
_DELETE_OBJECT_KEYS = {
"bucket": "Bucket",
"key": "Key",
"mfa": "MFA",
"versionid": "VersionId",
"requestpayer": "RequestPayer",
}
#: `S3.Bucket.create
#: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html
#: #S3.Bucket.create>`_
_POST_CONTAINER_KEYS = {
"acl": "ACL",
"bucket": "Bucket",
"createbucketconfiguration": "CreateBucketConfiguration",
"locationconstraint": "LocationConstraint",
"grantfullcontrol": "GrantFullControl",
"grantread": "GrantRead",
"grantreadacp": "GrantReadACP",
"grantwrite": "GrantWrite",
"grantwriteacp": "GrantWriteACP",
}