Source code for cloudstorage.drivers.minio

"""Minio Driver."""
import logging
import re
from datetime import datetime, timedelta
from typing import Dict, Iterable, List  # noqa: F401
from urllib.parse import quote, urljoin

from inflection import camelize, underscore
from minio import Minio, PostPolicy, definitions
from minio.error import (
    BucketAlreadyExists,
    BucketAlreadyOwnedByYou,
    BucketNotEmpty,
    InvalidAccessKeyId,
    InvalidBucketError,
    InvalidBucketName,
    NoSuchKey,
    ResponseError,
    SignatureDoesNotMatch,
)

from cloudstorage import Blob, Container, Driver, messages
from cloudstorage.exceptions import (
    CloudStorageError,
    CredentialsError,
    IsNotEmptyError,
    NotFoundError,
)
from cloudstorage.helpers import (
    file_content_type,
    validate_file_or_path,
)
from cloudstorage.typed import (
    ContentLength,
    ExtraOptions,
    FileLike,
    FormPost,
    MetaData,
)

__all__ = ['MinioDriver']

logger = logging.getLogger(__name__)

_REGIONS = [
    'us-east-1',
    'us-west-1',
    'us-west-2',
    'eu-west-1',
    'eu-central-1',
    'ap-southeast-1',
    'ap-southeast-2',
    'ap-northeast-1',
    'ap-northeast-2',
    'sa-east-1',
    'cn-north-1',
]


[docs]class MinioDriver(Driver): """Driver for interacting with any Minio compatible object storage server. .. code-block:: python from cloudstorage.drivers.minio import MinioDriver storage = MinioDriver(endpoint='minioserver:port', key='<my-access-key-id>', secret='<my-secret-access-key>', region='us-east-1', secure=True) # <Driver: Minio us-east-1> References: * `Python Client API Reference <https://docs.minio.io/docs/python-client-api-reference.html>`_ * `Minio Python Library <https://github.com/minio/minio-py>`_ :param endpoint: Minio server to connect to. :type endpoint: str :param key: Minio access key. :type key: str :param secret: Minio secret key. :type secret: str :param region: (optional) Region to connect to. Defaults to `us-east-1`. :type region: str :param kwargs: (optional) Extra driver options. * secure (`bool`): Use secure connection. * http_client (:class:`urllib3.poolmanager.PoolManager`): Use custom http client. :type kwargs: dict """ name = 'MINIO' url = 'https://www.minio.io' def __init__(self, endpoint: str, key: str, secret: str = None, region: str = 'us-east-1', **kwargs: Dict) -> None: secure = kwargs.pop('secure', True) http_client = kwargs.pop('http_client', None) self._client = Minio(endpoint, access_key=key, secret_key=secret, secure=secure, region=region, http_client=http_client) super().__init__(key=key, secret=secret, region=region, **kwargs)
[docs] def __iter__(self) -> Iterable[Container]: for bucket in self.client.list_buckets(): yield self._make_container(bucket)
[docs] def __len__(self) -> int: buckets = [bucket for bucket in self.client.list_buckets()] return len(buckets)
@staticmethod def _normalize_parameters(params: Dict[str, str], normalizers: Dict[str, str]) -> Dict[str, str]: normalized = params.copy() for key, value in params.items(): normalized.pop(key) if not value: continue key_inflected = camelize(underscore(key), uppercase_first_letter=True) # Only include parameters found in normalizers key_overrider = normalizers.get(key_inflected.lower()) if key_overrider: normalized[key_overrider] = value return normalized def _get_bucket(self, bucket_name: str) -> definitions.Bucket: """Get a Minio bucket. :param bucket_name: The Bucket's name identifier. :type bucket_name: str :return: Bucket resource object. :rtype: :class:`minio.definitions.Bucket` :raises NotFoundError: If the bucket does not exist. """ for bucket in self.client.list_buckets(): if bucket.name == bucket_name: return bucket raise NotFoundError(messages.CONTAINER_NOT_FOUND % bucket_name) def _make_obj(self, container: Container, obj: definitions.Object) -> Blob: """Convert Minio Object to Blob instance. :param container: The container that holds the blob. :type container: :class:`.Container` :param obj: Minio object stats. :type obj: :class:`minio.definitions.Object` :return: A blob object. :rtype: :class:`.Blob` """ obj_metadata = {} if obj.metadata is None else obj.metadata meta_data = {} for name, value in obj_metadata.items(): meta_key = re.sub(r'\b%s\b' % re.escape(self._OBJECT_META_PREFIX), '', name, flags=re.IGNORECASE) if meta_key != name: # Content-Type key is in the obj meta data meta_data[meta_key] = value return Blob(name=obj.object_name, checksum='', etag=obj.etag, size=obj.size, container=container, driver=self, acl={}, meta_data=meta_data, content_disposition=None, content_type=obj.content_type, cache_control=None, created_at=None, modified_at=obj.last_modified, expires_at=None) def _make_container(self, bucket: definitions.Bucket) -> Container: """Convert Minio Bucket to Container. :param bucket: Minio bucket. :type bucket: :class:`minio.definitions.Bucket` :return: The container if it exists. :rtype: :class:`.Container` """ created_at = bucket.creation_date.astimezone(tz=None) return Container(name=bucket.name, driver=self, acl='', meta_data=None, created_at=created_at) @property def client(self) -> Minio: """Minio client session. :return: Minio client session. :rtype: :class:`minio.Minio` """ return self._client
[docs] def validate_credentials(self) -> None: try: for _ in self.client.list_buckets(): break except (InvalidAccessKeyId, SignatureDoesNotMatch) as err: raise CredentialsError(str(err))
@property def regions(self) -> List[str]: return _REGIONS
[docs] def create_container(self, container_name: str, acl: str = None, meta_data: MetaData = None) -> Container: if meta_data: logger.info(messages.OPTION_NOT_SUPPORTED, 'meta_data') if acl: logger.info(messages.OPTION_NOT_SUPPORTED, 'acl') try: self.client.make_bucket(container_name) except (BucketAlreadyExists, BucketAlreadyOwnedByYou): pass except (InvalidBucketName, InvalidBucketError, ResponseError) as err: raise CloudStorageError(err.message) bucket = self._get_bucket(container_name) return self._make_container(bucket)
[docs] def get_container(self, container_name: str) -> Container: bucket = self._get_bucket(container_name) return self._make_container(bucket)
[docs] def patch_container(self, container: Container) -> None: raise NotImplementedError
[docs] def delete_container(self, container: Container) -> None: bucket = self._get_bucket(container.name) try: self.client.remove_bucket(container.name) except BucketNotEmpty: raise IsNotEmptyError(messages.CONTAINER_NOT_EMPTY % bucket.name)
[docs] def container_cdn_url(self, container: Container) -> str: return '%s/%s' % (self.client._endpoint_url, container.name)
[docs] def enable_container_cdn(self, container: Container) -> bool: logger.warning(messages.FEATURE_NOT_SUPPORTED, 'enable_container_cdn') return False
[docs] def disable_container_cdn(self, container: Container) -> bool: logger.warning(messages.FEATURE_NOT_SUPPORTED, 'disable_container_cdn') return False
[docs] def upload_blob(self, container: Container, filename: FileLike, blob_name: str = None, acl: str = None, meta_data: MetaData = None, content_type: str = None, content_disposition: str = None, cache_control: str = None, chunk_size: int = 1024, extra: ExtraOptions = None) -> Blob: meta_data = {} if meta_data is None else meta_data extra = {} if extra is None else extra blob_name = blob_name or validate_file_or_path(filename) if not content_type: if isinstance(filename, str): content_type = file_content_type(filename) else: content_type = file_content_type(blob_name) if isinstance(filename, str): self.client.fput_object(container.name, blob_name, filename, content_type=content_type, metadata=meta_data) else: length = extra.pop('length', len(filename.read())) filename.seek(0) self.client.put_object(container.name, blob_name, filename, length, content_type=content_type, metadata=meta_data) return self.get_blob(container, blob_name)
[docs] def get_blob(self, container: Container, blob_name: str) -> Blob: try: obj = self.client.stat_object(container.name, blob_name) except NoSuchKey: raise NotFoundError(messages.BLOB_NOT_FOUND % (blob_name, container.name)) return self._make_obj(container, obj)
[docs] def get_blobs(self, container: Container) -> Iterable[Blob]: for obj in self.client.list_objects(container.name, recursive=False): yield self._make_obj(container, obj)
[docs] def download_blob(self, blob: Blob, destination: FileLike) -> None: data = self.client.get_object(blob.container.name, blob.name) if isinstance(destination, str): with open(destination, 'wb') as blob_data: for d in data.stream(4096): blob_data.write(d) else: for d in data.stream(4096): destination.write(d)
[docs] def patch_blob(self, blob: Blob) -> None: raise NotImplementedError
[docs] def delete_blob(self, blob: Blob) -> None: try: self.client.remove_object(blob.container.name, blob.name) except ResponseError as err: raise CloudStorageError(err.message)
[docs] def blob_cdn_url(self, blob: Blob) -> str: container_url = self.container_cdn_url(blob.container) blob_name_cleaned = quote(blob.name) blob_path = '%s/%s' % (container_url, blob_name_cleaned) url = urljoin(container_url, blob_path) return url
[docs] def generate_container_upload_url(self, container: Container, blob_name: str, expires: int = 3600, acl: str = None, meta_data: MetaData = None, content_disposition: str = None, content_length: ContentLength = None, content_type: str = None, cache_control: str = None, extra: ExtraOptions = None) -> FormPost: if content_disposition: logger.warning(messages.OPTION_NOT_SUPPORTED, 'content_disposition') if cache_control: logger.warning(messages.OPTION_NOT_SUPPORTED, 'cache_control') meta_data = {} if meta_data is None else meta_data post_policy = PostPolicy() post_policy.set_bucket_name(container.name) post_policy.set_key_startswith(blob_name) if content_length: min_range, max_range = content_length post_policy.set_content_length_range(min_range, max_range) if content_type: post_policy.set_content_type(content_type) for meta_name, meta_value in meta_data.items(): meta_name = self._OBJECT_META_PREFIX + meta_name post_policy.policies.append(('eq', '$%s' % meta_name, meta_value)) post_policy.form_data[meta_name] = meta_value expires_date = datetime.utcnow() + timedelta(seconds=expires) post_policy.set_expires(expires_date) url, fields = self.client.presigned_post_policy(post_policy) return {'url': url, 'fields': fields}
[docs] def generate_blob_download_url(self, blob: Blob, expires: int = 3600, method: str = 'GET', content_disposition: str = None, extra: ExtraOptions = None) -> str: extra = {} if extra is None else extra response_headers = self._normalize_parameters(extra, self._GET_OBJECT_KEYS) if content_disposition: response_headers.setdefault('response-content-disposition', content_disposition) expires = timedelta(seconds=int(expires)) url = self.client.presigned_get_object(blob.container.name, blob.name, expires, response_headers) return url
_OBJECT_META_PREFIX = 'X-Amz-Meta-' # type: str #: `S3.Client.generate_presigned_post #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Client.generate_presigned_post>`_ _POST_OBJECT_KEYS = { 'acl': 'acl', 'cachecontrol': 'Cache-Control', 'contenttype': 'Content-Type', 'contentdisposition': 'Content-Disposition', 'contentencoding': 'Content-Encoding', 'expires': 'Expires', 'successactionredirect': 'success_action_redirect', 'redirect': 'redirect', 'successactionstatus': 'success_action_status', 'xamzmeta': 'x-amz-meta-', } #: `#S3.Client.get_object #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Client.get_object>`_ _GET_OBJECT_KEYS = { 'bucket': 'Bucket', 'ifmatch': 'IfMatch', 'ifmodifiedsince': 'IfModifiedSince', 'ifnonematch': 'IfNoneMatch', 'ifunmodifiedsince': 'IfUnmodifiedSince', 'key': 'Key', 'range': 'Range', 'responsecachecontrol': 'ResponseCacheControl', 'responsecontentdisposition': 'ResponseContentDisposition', 'responsecontentencoding': 'ResponseContentEncoding', 'responsecontentlanguage': 'ResponseContentLanguage', 'responsecontenttype': 'ResponseContentType', 'responseexpires': 'ResponseExpires', 'versionid': 'VersionId', 'ssecustomeralgorithm': 'SSECustomerAlgorithm', 'ssecustomerkey': 'SSECustomerKey', 'requestpayer': 'RequestPayer', 'partnumber': 'PartNumber', # Extra keys to standardize across all drivers 'cachecontrol': 'ResponseCacheControl', 'contentdisposition': 'ResponseContentDisposition', 'contentencoding': 'ResponseContentEncoding', 'contentlanguage': 'ResponseContentLanguage', 'contenttype': 'ResponseContentType', 'expires': 'ResponseExpires', } #: `S3.Client.put_object #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Client.put_object>`_ _PUT_OBJECT_KEYS = { 'acl': 'ACL', 'body': 'Body', 'bucket': 'Bucket', 'cachecontrol': 'CacheControl', 'contentdisposition': 'ContentDisposition', 'contentencoding': 'ContentEncoding', 'contentlanguage': 'ContentLanguage', 'contentlength': 'ContentLength', 'contentmd5': 'ContentMD5', 'contenttype': 'ContentType', 'expires': 'Expires', 'grantfullcontrol': 'GrantFullControl', 'grantread': 'GrantRead', 'grantreadacp': 'GrantReadACP', 'grantwriteacp': 'GrantWriteACP', 'key': 'Key', 'metadata': 'Metadata', 'serversideencryption': 'ServerSideEncryption', 'storageclass': 'StorageClass', 'websiteredirectlocation': 'WebsiteRedirectLocation', 'ssecustomeralgorithm': 'SSECustomerAlgorithm', 'ssecustomerkey': 'SSECustomerKey', 'ssekmskeyid': 'SSEKMSKeyId', 'requestpayer': 'RequestPayer', 'tagging': 'Tagging', } #: `S3.Client.delete_object #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Client.delete_object>`_ _DELETE_OBJECT_KEYS = { 'bucket': 'Bucket', 'key': 'Key', 'mfa': 'MFA', 'versionid': 'VersionId', 'requestpayer': 'RequestPayer', } #: `S3.Bucket.create #: <http://boto3.readthedocs.io/en/latest/reference/services/s3.html #: #S3.Bucket.create>`_ _POST_CONTAINER_KEYS = { 'acl': 'ACL', 'bucket': 'Bucket', 'createbucketconfiguration': 'CreateBucketConfiguration', 'locationconstraint': 'LocationConstraint', 'grantfullcontrol': 'GrantFullControl', 'grantread': 'GrantRead', 'grantreadacp': 'GrantReadACP', 'grantwrite': 'GrantWrite', 'grantwriteacp': 'GrantWriteACP', }