"""Helper methods for Cloud Storage."""
import hashlib
import mimetypes
import os
from _hashlib import HASH
from typing import BinaryIO, Dict, Generator, Optional, TextIO, Tuple, Union
import magic # type: ignore
from cloudstorage.typed import FileLike
[docs]def read_in_chunks(
file_object: Union[BinaryIO, TextIO], block_size: int = 4096
) -> Generator[Union[bytes, str], None, None]:
"""Return a generator which yields data in chunks.
Source: `read-file-in-chunks-ram-usage-read-strings-from-binary-file
<https://stackoverflow.com/questions/17056382/
read-file-in-chunks-ram-usage-read-strings-from-binary-files>`_
:param file_object: File object to read in chunks.
:type file_object: file object
:param block_size: (optional) Chunk size.
:type block_size: int
:yield: The next chunk in file object.
:yield type: `bytes`
"""
for chunk in iter(lambda: file_object.read(block_size), b""):
yield chunk
[docs]def file_checksum(
filename: FileLike, hash_type: str = "md5", block_size: int = 4096
) -> HASH:
"""Returns checksum for file.
.. code-block:: python
from cloudstorage.helpers import file_checksum
picture_path = '/path/picture.png'
file_checksum(picture_path, hash_type='sha256')
# '03ef90ba683795018e541ddfb0ae3e958a359ee70dd4fccc7e747ee29b5df2f8'
Source: `get-md5-hash-of-big-files-in-python <https://stackoverflow.com/
questions/1131220/get-md5-hash-of-big-files-in-python>`_
:param filename: File path or stream.
:type filename: str or FileLike
:param hash_type: Hash algorithm function name.
:type hash_type: str
:param block_size: (optional) Chunk size.
:type block_size: int
:return: Hash of file.
:raise RuntimeError: If the hash algorithm is not found in :mod:`hashlib`.
.. versionchanged:: 0.4
Returns :class:`_hashlib.HASH` instead of `HASH.hexdigest()`.
"""
try:
file_hash = getattr(hashlib, hash_type)()
except AttributeError:
raise RuntimeError("Invalid or unsupported hash type: %s" % hash_type)
if isinstance(filename, str):
with open(filename, "rb") as file_:
for chunk in read_in_chunks(file_, block_size=block_size):
file_hash.update(chunk)
else:
for chunk in read_in_chunks(filename, block_size=block_size):
file_hash.update(chunk)
# rewind the stream so it can be re-read later
if filename.seekable():
filename.seek(0)
return file_hash
[docs]def validate_file_or_path(filename: FileLike) -> Optional[str]:
"""Return filename from file path or from file like object.
Source: `rackspace/pyrax/object_storage.py <https://github.com/pycontribs/
pyrax/blob/master/pyrax/object_storage.py>`_
:param filename: File path or file like object.
:type filename: str or file
:return: Filename.
:rtype: str or None
:raise FileNotFoundError: If the file path is invalid.
"""
name = None
if isinstance(filename, str):
if not os.path.exists(filename):
raise FileNotFoundError(filename)
name = os.path.basename(filename)
else:
try:
name = os.path.basename(str(filename.name))
except AttributeError:
pass
return name
[docs]def file_content_type(filename: FileLike) -> Optional[str]:
"""Guess content type for file path or file like object.
:param filename: File path or file like object.
:type filename: str or file
:return: Content type.
:rtype: str or None
"""
content_type = None
if isinstance(filename, str):
if os.path.isfile(filename):
content_type = magic.from_file(filename=filename, mime=True)
else:
content_type = mimetypes.guess_type(filename)[0]
else: # BufferedReader
name = validate_file_or_path(filename)
if name:
content_type = mimetypes.guess_type(name)[0]
return content_type
[docs]def parse_content_disposition(data: str) -> Tuple[Optional[str], Dict]:
"""Parse Content-Disposition header.
Example: ::
>>> parse_content_disposition('inline')
('inline', {})
>>> parse_content_disposition('attachment; filename="foo.html"')
('attachment', {'filename': 'foo.html'})
Source: `pyrates/multifruits <https://github.com/pyrates/multifruits>`_
:param data: Content-Disposition header value.
:type data: str
:return: Disposition type and fields.
:rtype: tuple
"""
dtype = None
params = {}
length = len(data)
start = 0
end = 0
i = 0
quoted = False
previous = ""
field = None
while i < length:
c = data[i]
if not quoted and c == ";":
if dtype is None:
dtype = data[start:end]
elif field is not None:
params[field.lower()] = data[start:end].replace("\\", "")
field = None
i += 1
start = end = i
elif c == '"':
i += 1
if not previous or previous != "\\":
if not quoted:
start = i
quoted = not quoted
else:
end = i
elif c == "=":
field = data[start:end]
i += 1
start = end = i
elif c == " ":
i += 1
if not quoted and start == end: # Leading spaces.
start = end = i
else:
i += 1
end = i
previous = c
if i:
if dtype is None:
dtype = data[start:end].lower()
elif field is not None:
params[field.lower()] = data[start:end].replace("\\", "")
return dtype, params