Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3)

Unified Diff: third_party/gsutil/gslib/hashing_helper.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « third_party/gsutil/gslib/gcs_json_media.py ('k') | third_party/gsutil/gslib/help_provider.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: third_party/gsutil/gslib/hashing_helper.py
diff --git a/third_party/gsutil/gslib/hashing_helper.py b/third_party/gsutil/gslib/hashing_helper.py
new file mode 100644
index 0000000000000000000000000000000000000000..dee2f96c926d5ae01a4ff52b87f4966c3e472198
--- /dev/null
+++ b/third_party/gsutil/gslib/hashing_helper.py
@@ -0,0 +1,418 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Helper functions for hashing functionality."""
+
+import base64
+import binascii
+from hashlib import md5
+import os
+
+from boto import config
+import crcmod
+
+from gslib.exception import CommandException
+from gslib.util import DEFAULT_FILE_BUFFER_SIZE
+from gslib.util import MIN_SIZE_COMPUTE_LOGGING
+from gslib.util import TRANSFER_BUFFER_SIZE
+from gslib.util import UsingCrcmodExtension
+
+
+SLOW_CRCMOD_WARNING = """
+WARNING: You have requested checksumming but your crcmod installation isn't
+using the module's C extension, so checksumming will run very slowly. For help
+installing the extension, please see:
+ $ gsutil help crcmod
+"""
+
+
+_SLOW_CRCMOD_DOWNLOAD_WARNING = """
+WARNING: Downloading this composite object requires integrity checking with
+CRC32c, but your crcmod installation isn't using the module's C extension,
+so the hash computation will likely throttle download performance. For help
+installing the extension, please see:
+ $ gsutil help crcmod
+To disable slow integrity checking, see the "check_hashes" option in your
+boto config file.
+"""
+
+_SLOW_CRC_EXCEPTION_TEXT = """
+Downloading this composite object requires integrity checking with CRC32c,
+but your crcmod installation isn't using the module's C extension, so the
+hash computation will likely throttle download performance. For help
+installing the extension, please see:
+
+ $ gsutil help crcmod
+
+To download regardless of crcmod performance or to skip slow integrity
+checks, see the "check_hashes" option in your boto config file.
+
+NOTE: It is strongly recommended that you not disable integrity checks. Doing so
+could allow data corruption to go undetected during uploading/downloading."""
+
+
+_NO_HASH_CHECK_WARNING = """
+WARNING: This download will not be validated since your crcmod installation
+doesn't use the module's C extension, so the hash computation would likely
+throttle download performance. For help in installing the extension, please
+see:
+ $ gsutil help crcmod
+To force integrity checking, see the "check_hashes" option in your boto config
+file.
+"""
+
+
+# Configuration values for hashing.
+CHECK_HASH_IF_FAST_ELSE_FAIL = 'if_fast_else_fail'
+CHECK_HASH_IF_FAST_ELSE_SKIP = 'if_fast_else_skip'
+CHECK_HASH_ALWAYS = 'always'
+CHECK_HASH_NEVER = 'never'
+
+
+def _CalculateHashFromContents(fp, hash_alg):
+ """Calculates a base64 digest of the contents of a seekable stream.
+
+ This function resets the file pointer to position 0.
+
+ Args:
+ fp: An already-open file object.
+ hash_alg: Instance of hashing class initialized to start state.
+
+ Returns:
+ Hash of the stream in hex string format.
+ """
+ hash_dict = {'placeholder': hash_alg}
+ fp.seek(0)
+ CalculateHashesFromContents(fp, hash_dict)
+ fp.seek(0)
+ return hash_dict['placeholder'].hexdigest()
+
+
+def CalculateHashesFromContents(fp, hash_dict, callback_processor=None):
+ """Calculates hashes of the contents of a file.
+
+ Args:
+ fp: An already-open file object (stream will be consumed).
+ hash_dict: Dict of (string alg_name: initialized hashing class)
+ Hashing class will be populated with digests upon return.
+ callback_processor: Optional callback processing class that implements
+ Progress(integer amount of bytes processed).
+ """
+ while True:
+ data = fp.read(DEFAULT_FILE_BUFFER_SIZE)
+ if not data:
+ break
+ for hash_alg in hash_dict.itervalues():
+ hash_alg.update(data)
+ if callback_processor:
+ callback_processor.Progress(len(data))
+
+
+def CalculateB64EncodedCrc32cFromContents(fp):
+ """Calculates a base64 CRC32c checksum of the contents of a seekable stream.
+
+ This function sets the stream position 0 before and after calculation.
+
+ Args:
+ fp: An already-open file object.
+
+ Returns:
+ CRC32c checksum of the file in base64 format.
+ """
+ return _CalculateB64EncodedHashFromContents(
+ fp, crcmod.predefined.Crc('crc-32c'))
+
+
+def CalculateB64EncodedMd5FromContents(fp):
+ """Calculates a base64 MD5 digest of the contents of a seekable stream.
+
+ This function sets the stream position 0 before and after calculation.
+
+ Args:
+ fp: An already-open file object.
+
+ Returns:
+ MD5 digest of the file in base64 format.
+ """
+ return _CalculateB64EncodedHashFromContents(fp, md5())
+
+
+def CalculateMd5FromContents(fp):
+ """Calculates a base64 MD5 digest of the contents of a seekable stream.
+
+ This function sets the stream position 0 before and after calculation.
+
+ Args:
+ fp: An already-open file object.
+
+ Returns:
+ MD5 digest of the file in hex format.
+ """
+ return _CalculateHashFromContents(fp, md5())
+
+
+def Base64EncodeHash(digest_value):
+ """Returns the base64-encoded version of the input hex digest value."""
+ return base64.encodestring(binascii.unhexlify(digest_value)).rstrip('\n')
+
+
+def Base64ToHexHash(base64_hash):
+ """Returns the hex digest value of the input base64-encoded hash.
+
+ Args:
+ base64_hash: Base64-encoded hash, which may contain newlines and single or
+ double quotes.
+
+ Returns:
+ Hex digest of the input argument.
+ """
+ return binascii.hexlify(base64.decodestring(base64_hash.strip('\n"\'')))
+
+
+def _CalculateB64EncodedHashFromContents(fp, hash_alg):
+ """Calculates a base64 digest of the contents of a seekable stream.
+
+ This function sets the stream position 0 before and after calculation.
+
+ Args:
+ fp: An already-open file object.
+ hash_alg: Instance of hashing class initialized to start state.
+
+ Returns:
+ Hash of the stream in base64 format.
+ """
+ return Base64EncodeHash(_CalculateHashFromContents(fp, hash_alg))
+
+
+def GetUploadHashAlgs():
+ """Returns a dict of hash algorithms for validating an uploaded object.
+
+ This is for use only with single object uploads, not compose operations
+ such as those used by parallel composite uploads (though it can be used to
+ validate the individual components).
+
+ Returns:
+ dict of (algorithm_name: hash_algorithm)
+ """
+ check_hashes_config = config.get(
+ 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL)
+ if check_hashes_config == 'never':
+ return {}
+ return {'md5': md5}
+
+
+def GetDownloadHashAlgs(logger, src_has_md5=False, src_has_crc32c=False):
+ """Returns a dict of hash algorithms for validating an object.
+
+ Args:
+ logger: logging.Logger for outputting log messages.
+ src_has_md5: If True, source object has an md5 hash.
+ src_has_crc32c: If True, source object has a crc32c hash.
+
+ Returns:
+ Dict of (string, hash algorithm).
+
+ Raises:
+ CommandException if hash algorithms satisfying the boto config file
+ cannot be returned.
+ """
+ check_hashes_config = config.get(
+ 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL)
+ if check_hashes_config == CHECK_HASH_NEVER:
+ return {}
+
+ hash_algs = {}
+ if src_has_md5:
+ hash_algs['md5'] = md5
+ elif src_has_crc32c:
+ # If the cloud provider supplies a CRC, we'll compute a checksum to
+ # validate if we're using a native crcmod installation and MD5 isn't
+ # offered as an alternative.
+ if UsingCrcmodExtension(crcmod):
+ hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c')
+ elif not hash_algs:
+ if check_hashes_config == CHECK_HASH_IF_FAST_ELSE_FAIL:
+ raise CommandException(_SLOW_CRC_EXCEPTION_TEXT)
+ elif check_hashes_config == CHECK_HASH_IF_FAST_ELSE_SKIP:
+ logger.warn(_NO_HASH_CHECK_WARNING)
+ elif check_hashes_config == CHECK_HASH_ALWAYS:
+ logger.warn(_SLOW_CRCMOD_DOWNLOAD_WARNING)
+ hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c')
+ else:
+ raise CommandException(
+ 'Your boto config \'check_hashes\' option is misconfigured.')
+
+ return hash_algs
+
+
+class HashingFileUploadWrapper(object):
+ """Wraps an input stream in a hash digester and exposes a stream interface.
+
+ This class provides integrity checking during file uploads via the
+ following properties:
+
+ Calls to read will appropriately update digesters with all bytes read.
+ Calls to seek (assuming it is supported by the wrapped stream) using
+ os.SEEK_SET will catch up / reset the digesters to the specified
+ position. If seek is called with a different os.SEEK mode, the caller
+ must return to the original position using os.SEEK_SET before further
+ reads.
+ Calls to seek are fast if the desired position is equal to the position at
+ the beginning of the last read call (we only need to re-hash bytes
+ from that point on).
+ """
+
+ def __init__(self, stream, digesters, hash_algs, src_url, logger):
+ """Initializes the wrapper.
+
+ Args:
+ stream: Input stream.
+ digesters: dict of {string: hash digester} containing digesters, where
+ string is the name of the hash algorithm.
+ hash_algs: dict of {string: hash algorithm} for resetting and
+ recalculating digesters. String is the name of the hash algorithm.
+ src_url: Source FileUrl that is being copied.
+ logger: For outputting log messages.
+ """
+ if not digesters:
+ raise CommandException('HashingFileUploadWrapper used with no digesters.')
+ elif not hash_algs:
+ raise CommandException('HashingFileUploadWrapper used with no hash_algs.')
+
+ self._orig_fp = stream
+ self._digesters = digesters
+ self._src_url = src_url
+ self._logger = logger
+ self._seek_away = None
+
+ self._digesters_previous = {}
+ for alg in self._digesters:
+ self._digesters_previous[alg] = self._digesters[alg].copy()
+ self._digesters_previous_mark = 0
+ self._digesters_current_mark = 0
+ self._hash_algs = hash_algs
+
+ def read(self, size=-1): # pylint: disable=invalid-name
+ """"Reads from the wrapped file pointer and calculates hash digests.
+
+ Args:
+ size: The amount of bytes to read. If ommited or negative, the entire
+ contents of the file will be read, hashed, and returned.
+
+ Returns:
+ Bytes from the wrapped stream.
+
+ Raises:
+ CommandException if the position of the wrapped stream is unknown.
+ """
+ if self._seek_away is not None:
+ raise CommandException('Read called on hashing file pointer in an '
+ 'unknown position; cannot correctly compute '
+ 'digest.')
+
+ data = self._orig_fp.read(size)
+ self._digesters_previous_mark = self._digesters_current_mark
+ for alg in self._digesters:
+ self._digesters_previous[alg] = self._digesters[alg].copy()
+ self._digesters[alg].update(data)
+ self._digesters_current_mark += len(data)
+ return data
+
+ def tell(self): # pylint: disable=invalid-name
+ """Returns the current stream position."""
+ return self._orig_fp.tell()
+
+ def seekable(self): # pylint: disable=invalid-name
+ """Returns true if the stream is seekable."""
+ return self._orig_fp.seekable()
+
+ def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name
+ """Seeks in the wrapped file pointer and catches up hash digests.
+
+ Args:
+ offset: The offset to seek to.
+ whence: os.SEEK_CUR, or SEEK_END, SEEK_SET.
+
+ Returns:
+ Return value from the wrapped stream's seek call.
+ """
+ if whence != os.SEEK_SET:
+ # We do not catch up hashes for non-absolute seeks, and rely on the
+ # caller to seek to an absolute position before reading.
+ self._seek_away = self._orig_fp.tell()
+
+ else:
+ # Hashes will be correct and it's safe to call read().
+ self._seek_away = None
+ if offset < self._digesters_previous_mark:
+ # This is earlier than our earliest saved digest, so we need to
+ # reset the digesters and scan from the beginning.
+ for alg in self._digesters:
+ self._digesters[alg] = self._hash_algs[alg]()
+ self._digesters_current_mark = 0
+ self._orig_fp.seek(0)
+ self._CatchUp(offset)
+
+ elif offset == self._digesters_previous_mark:
+ # Just load the saved digests.
+ self._digesters_current_mark = self._digesters_previous_mark
+ for alg in self._digesters:
+ self._digesters[alg] = self._digesters_previous[alg]
+
+ elif offset < self._digesters_current_mark:
+ # Reset the position to our previous digest and scan forward.
+ self._digesters_current_mark = self._digesters_previous_mark
+ for alg in self._digesters:
+ self._digesters[alg] = self._digesters_previous[alg]
+ self._orig_fp.seek(self._digesters_previous_mark)
+ self._CatchUp(offset - self._digesters_previous_mark)
+
+ else:
+ # Scan forward from our current digest and position.
+ self._orig_fp.seek(self._digesters_current_mark)
+ self._CatchUp(offset - self._digesters_current_mark)
+
+ return self._orig_fp.seek(offset, whence)
+
+ def _CatchUp(self, bytes_to_read):
+ """Catches up hashes, but does not return data and uses little memory.
+
+ Before calling this function, digesters_current_mark should be updated
+ to the current location of the original stream and the self._digesters
+ should be current to that point (but no further).
+
+ Args:
+ bytes_to_read: Number of bytes to catch up from the original stream.
+ """
+ if self._orig_fp.tell() != self._digesters_current_mark:
+ raise CommandException(
+ 'Invalid mark when catching up hashes. Stream position %s, hash '
+ 'position %s' % (self._orig_fp.tell(), self._digesters_current_mark))
+
+ for alg in self._digesters:
+ if bytes_to_read >= MIN_SIZE_COMPUTE_LOGGING:
+ self._logger.info('Catching up %s for %s...', alg,
+ self._src_url.url_string)
+ self._digesters_previous[alg] = self._digesters[alg].copy()
+
+ self._digesters_previous_mark = self._digesters_current_mark
+ bytes_remaining = bytes_to_read
+ bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE)
+ while bytes_this_round:
+ data = self._orig_fp.read(bytes_this_round)
+ bytes_remaining -= bytes_this_round
+ for alg in self._digesters:
+ self._digesters[alg].update(data)
+ bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE)
+ self._digesters_current_mark += bytes_to_read
« no previous file with comments | « third_party/gsutil/gslib/gcs_json_media.py ('k') | third_party/gsutil/gslib/help_provider.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698