| OLD | NEW |
| (Empty) |
| 1 # -*- coding: utf-8 -*- | |
| 2 # Copyright 2014 Google Inc. All Rights Reserved. | |
| 3 # | |
| 4 # Licensed under the Apache License, Version 2.0 (the "License"); | |
| 5 # you may not use this file except in compliance with the License. | |
| 6 # You may obtain a copy of the License at | |
| 7 # | |
| 8 # http://www.apache.org/licenses/LICENSE-2.0 | |
| 9 # | |
| 10 # Unless required by applicable law or agreed to in writing, software | |
| 11 # distributed under the License is distributed on an "AS IS" BASIS, | |
| 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 13 # See the License for the specific language governing permissions and | |
| 14 # limitations under the License. | |
| 15 """Helper functions for hashing functionality.""" | |
| 16 | |
| 17 import base64 | |
| 18 import binascii | |
| 19 from hashlib import md5 | |
| 20 import os | |
| 21 | |
| 22 from boto import config | |
| 23 import crcmod | |
| 24 | |
| 25 from gslib.exception import CommandException | |
| 26 from gslib.util import DEFAULT_FILE_BUFFER_SIZE | |
| 27 from gslib.util import MIN_SIZE_COMPUTE_LOGGING | |
| 28 from gslib.util import TRANSFER_BUFFER_SIZE | |
| 29 from gslib.util import UsingCrcmodExtension | |
| 30 | |
| 31 | |
| 32 SLOW_CRCMOD_WARNING = """ | |
| 33 WARNING: You have requested checksumming but your crcmod installation isn't | |
| 34 using the module's C extension, so checksumming will run very slowly. For help | |
| 35 installing the extension, please see: | |
| 36 $ gsutil help crcmod | |
| 37 """ | |
| 38 | |
| 39 | |
| 40 _SLOW_CRCMOD_DOWNLOAD_WARNING = """ | |
| 41 WARNING: Downloading this composite object requires integrity checking with | |
| 42 CRC32c, but your crcmod installation isn't using the module's C extension, | |
| 43 so the hash computation will likely throttle download performance. For help | |
| 44 installing the extension, please see: | |
| 45 $ gsutil help crcmod | |
| 46 To disable slow integrity checking, see the "check_hashes" option in your | |
| 47 boto config file. | |
| 48 """ | |
| 49 | |
| 50 _SLOW_CRC_EXCEPTION_TEXT = """ | |
| 51 Downloading this composite object requires integrity checking with CRC32c, | |
| 52 but your crcmod installation isn't using the module's C extension, so the | |
| 53 hash computation will likely throttle download performance. For help | |
| 54 installing the extension, please see: | |
| 55 | |
| 56 $ gsutil help crcmod | |
| 57 | |
| 58 To download regardless of crcmod performance or to skip slow integrity | |
| 59 checks, see the "check_hashes" option in your boto config file. | |
| 60 | |
| 61 NOTE: It is strongly recommended that you not disable integrity checks. Doing so | |
| 62 could allow data corruption to go undetected during uploading/downloading.""" | |
| 63 | |
| 64 | |
| 65 _NO_HASH_CHECK_WARNING = """ | |
| 66 WARNING: This download will not be validated since your crcmod installation | |
| 67 doesn't use the module's C extension, so the hash computation would likely | |
| 68 throttle download performance. For help in installing the extension, please | |
| 69 see: | |
| 70 $ gsutil help crcmod | |
| 71 To force integrity checking, see the "check_hashes" option in your boto config | |
| 72 file. | |
| 73 """ | |
| 74 | |
| 75 | |
| 76 # Configuration values for hashing. | |
| 77 CHECK_HASH_IF_FAST_ELSE_FAIL = 'if_fast_else_fail' | |
| 78 CHECK_HASH_IF_FAST_ELSE_SKIP = 'if_fast_else_skip' | |
| 79 CHECK_HASH_ALWAYS = 'always' | |
| 80 CHECK_HASH_NEVER = 'never' | |
| 81 | |
| 82 | |
| 83 def _CalculateHashFromContents(fp, hash_alg): | |
| 84 """Calculates a base64 digest of the contents of a seekable stream. | |
| 85 | |
| 86 This function resets the file pointer to position 0. | |
| 87 | |
| 88 Args: | |
| 89 fp: An already-open file object. | |
| 90 hash_alg: Instance of hashing class initialized to start state. | |
| 91 | |
| 92 Returns: | |
| 93 Hash of the stream in hex string format. | |
| 94 """ | |
| 95 hash_dict = {'placeholder': hash_alg} | |
| 96 fp.seek(0) | |
| 97 CalculateHashesFromContents(fp, hash_dict) | |
| 98 fp.seek(0) | |
| 99 return hash_dict['placeholder'].hexdigest() | |
| 100 | |
| 101 | |
| 102 def CalculateHashesFromContents(fp, hash_dict, callback_processor=None): | |
| 103 """Calculates hashes of the contents of a file. | |
| 104 | |
| 105 Args: | |
| 106 fp: An already-open file object (stream will be consumed). | |
| 107 hash_dict: Dict of (string alg_name: initialized hashing class) | |
| 108 Hashing class will be populated with digests upon return. | |
| 109 callback_processor: Optional callback processing class that implements | |
| 110 Progress(integer amount of bytes processed). | |
| 111 """ | |
| 112 while True: | |
| 113 data = fp.read(DEFAULT_FILE_BUFFER_SIZE) | |
| 114 if not data: | |
| 115 break | |
| 116 for hash_alg in hash_dict.itervalues(): | |
| 117 hash_alg.update(data) | |
| 118 if callback_processor: | |
| 119 callback_processor.Progress(len(data)) | |
| 120 | |
| 121 | |
| 122 def CalculateB64EncodedCrc32cFromContents(fp): | |
| 123 """Calculates a base64 CRC32c checksum of the contents of a seekable stream. | |
| 124 | |
| 125 This function sets the stream position 0 before and after calculation. | |
| 126 | |
| 127 Args: | |
| 128 fp: An already-open file object. | |
| 129 | |
| 130 Returns: | |
| 131 CRC32c checksum of the file in base64 format. | |
| 132 """ | |
| 133 return _CalculateB64EncodedHashFromContents( | |
| 134 fp, crcmod.predefined.Crc('crc-32c')) | |
| 135 | |
| 136 | |
| 137 def CalculateB64EncodedMd5FromContents(fp): | |
| 138 """Calculates a base64 MD5 digest of the contents of a seekable stream. | |
| 139 | |
| 140 This function sets the stream position 0 before and after calculation. | |
| 141 | |
| 142 Args: | |
| 143 fp: An already-open file object. | |
| 144 | |
| 145 Returns: | |
| 146 MD5 digest of the file in base64 format. | |
| 147 """ | |
| 148 return _CalculateB64EncodedHashFromContents(fp, md5()) | |
| 149 | |
| 150 | |
| 151 def CalculateMd5FromContents(fp): | |
| 152 """Calculates a base64 MD5 digest of the contents of a seekable stream. | |
| 153 | |
| 154 This function sets the stream position 0 before and after calculation. | |
| 155 | |
| 156 Args: | |
| 157 fp: An already-open file object. | |
| 158 | |
| 159 Returns: | |
| 160 MD5 digest of the file in hex format. | |
| 161 """ | |
| 162 return _CalculateHashFromContents(fp, md5()) | |
| 163 | |
| 164 | |
| 165 def Base64EncodeHash(digest_value): | |
| 166 """Returns the base64-encoded version of the input hex digest value.""" | |
| 167 return base64.encodestring(binascii.unhexlify(digest_value)).rstrip('\n') | |
| 168 | |
| 169 | |
| 170 def Base64ToHexHash(base64_hash): | |
| 171 """Returns the hex digest value of the input base64-encoded hash. | |
| 172 | |
| 173 Args: | |
| 174 base64_hash: Base64-encoded hash, which may contain newlines and single or | |
| 175 double quotes. | |
| 176 | |
| 177 Returns: | |
| 178 Hex digest of the input argument. | |
| 179 """ | |
| 180 return binascii.hexlify(base64.decodestring(base64_hash.strip('\n"\''))) | |
| 181 | |
| 182 | |
| 183 def _CalculateB64EncodedHashFromContents(fp, hash_alg): | |
| 184 """Calculates a base64 digest of the contents of a seekable stream. | |
| 185 | |
| 186 This function sets the stream position 0 before and after calculation. | |
| 187 | |
| 188 Args: | |
| 189 fp: An already-open file object. | |
| 190 hash_alg: Instance of hashing class initialized to start state. | |
| 191 | |
| 192 Returns: | |
| 193 Hash of the stream in base64 format. | |
| 194 """ | |
| 195 return Base64EncodeHash(_CalculateHashFromContents(fp, hash_alg)) | |
| 196 | |
| 197 | |
| 198 def GetUploadHashAlgs(): | |
| 199 """Returns a dict of hash algorithms for validating an uploaded object. | |
| 200 | |
| 201 This is for use only with single object uploads, not compose operations | |
| 202 such as those used by parallel composite uploads (though it can be used to | |
| 203 validate the individual components). | |
| 204 | |
| 205 Returns: | |
| 206 dict of (algorithm_name: hash_algorithm) | |
| 207 """ | |
| 208 check_hashes_config = config.get( | |
| 209 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL) | |
| 210 if check_hashes_config == 'never': | |
| 211 return {} | |
| 212 return {'md5': md5} | |
| 213 | |
| 214 | |
| 215 def GetDownloadHashAlgs(logger, src_has_md5=False, src_has_crc32c=False): | |
| 216 """Returns a dict of hash algorithms for validating an object. | |
| 217 | |
| 218 Args: | |
| 219 logger: logging.Logger for outputting log messages. | |
| 220 src_has_md5: If True, source object has an md5 hash. | |
| 221 src_has_crc32c: If True, source object has a crc32c hash. | |
| 222 | |
| 223 Returns: | |
| 224 Dict of (string, hash algorithm). | |
| 225 | |
| 226 Raises: | |
| 227 CommandException if hash algorithms satisfying the boto config file | |
| 228 cannot be returned. | |
| 229 """ | |
| 230 check_hashes_config = config.get( | |
| 231 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL) | |
| 232 if check_hashes_config == CHECK_HASH_NEVER: | |
| 233 return {} | |
| 234 | |
| 235 hash_algs = {} | |
| 236 if src_has_md5: | |
| 237 hash_algs['md5'] = md5 | |
| 238 elif src_has_crc32c: | |
| 239 # If the cloud provider supplies a CRC, we'll compute a checksum to | |
| 240 # validate if we're using a native crcmod installation and MD5 isn't | |
| 241 # offered as an alternative. | |
| 242 if UsingCrcmodExtension(crcmod): | |
| 243 hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c') | |
| 244 elif not hash_algs: | |
| 245 if check_hashes_config == CHECK_HASH_IF_FAST_ELSE_FAIL: | |
| 246 raise CommandException(_SLOW_CRC_EXCEPTION_TEXT) | |
| 247 elif check_hashes_config == CHECK_HASH_IF_FAST_ELSE_SKIP: | |
| 248 logger.warn(_NO_HASH_CHECK_WARNING) | |
| 249 elif check_hashes_config == CHECK_HASH_ALWAYS: | |
| 250 logger.warn(_SLOW_CRCMOD_DOWNLOAD_WARNING) | |
| 251 hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c') | |
| 252 else: | |
| 253 raise CommandException( | |
| 254 'Your boto config \'check_hashes\' option is misconfigured.') | |
| 255 | |
| 256 return hash_algs | |
| 257 | |
| 258 | |
| 259 class HashingFileUploadWrapper(object): | |
| 260 """Wraps an input stream in a hash digester and exposes a stream interface. | |
| 261 | |
| 262 This class provides integrity checking during file uploads via the | |
| 263 following properties: | |
| 264 | |
| 265 Calls to read will appropriately update digesters with all bytes read. | |
| 266 Calls to seek (assuming it is supported by the wrapped stream) using | |
| 267 os.SEEK_SET will catch up / reset the digesters to the specified | |
| 268 position. If seek is called with a different os.SEEK mode, the caller | |
| 269 must return to the original position using os.SEEK_SET before further | |
| 270 reads. | |
| 271 Calls to seek are fast if the desired position is equal to the position at | |
| 272 the beginning of the last read call (we only need to re-hash bytes | |
| 273 from that point on). | |
| 274 """ | |
| 275 | |
| 276 def __init__(self, stream, digesters, hash_algs, src_url, logger): | |
| 277 """Initializes the wrapper. | |
| 278 | |
| 279 Args: | |
| 280 stream: Input stream. | |
| 281 digesters: dict of {string: hash digester} containing digesters, where | |
| 282 string is the name of the hash algorithm. | |
| 283 hash_algs: dict of {string: hash algorithm} for resetting and | |
| 284 recalculating digesters. String is the name of the hash algorithm. | |
| 285 src_url: Source FileUrl that is being copied. | |
| 286 logger: For outputting log messages. | |
| 287 """ | |
| 288 if not digesters: | |
| 289 raise CommandException('HashingFileUploadWrapper used with no digesters.') | |
| 290 elif not hash_algs: | |
| 291 raise CommandException('HashingFileUploadWrapper used with no hash_algs.') | |
| 292 | |
| 293 self._orig_fp = stream | |
| 294 self._digesters = digesters | |
| 295 self._src_url = src_url | |
| 296 self._logger = logger | |
| 297 self._seek_away = None | |
| 298 | |
| 299 self._digesters_previous = {} | |
| 300 for alg in self._digesters: | |
| 301 self._digesters_previous[alg] = self._digesters[alg].copy() | |
| 302 self._digesters_previous_mark = 0 | |
| 303 self._digesters_current_mark = 0 | |
| 304 self._hash_algs = hash_algs | |
| 305 | |
| 306 def read(self, size=-1): # pylint: disable=invalid-name | |
| 307 """"Reads from the wrapped file pointer and calculates hash digests. | |
| 308 | |
| 309 Args: | |
| 310 size: The amount of bytes to read. If ommited or negative, the entire | |
| 311 contents of the file will be read, hashed, and returned. | |
| 312 | |
| 313 Returns: | |
| 314 Bytes from the wrapped stream. | |
| 315 | |
| 316 Raises: | |
| 317 CommandException if the position of the wrapped stream is unknown. | |
| 318 """ | |
| 319 if self._seek_away is not None: | |
| 320 raise CommandException('Read called on hashing file pointer in an ' | |
| 321 'unknown position; cannot correctly compute ' | |
| 322 'digest.') | |
| 323 | |
| 324 data = self._orig_fp.read(size) | |
| 325 self._digesters_previous_mark = self._digesters_current_mark | |
| 326 for alg in self._digesters: | |
| 327 self._digesters_previous[alg] = self._digesters[alg].copy() | |
| 328 self._digesters[alg].update(data) | |
| 329 self._digesters_current_mark += len(data) | |
| 330 return data | |
| 331 | |
| 332 def tell(self): # pylint: disable=invalid-name | |
| 333 """Returns the current stream position.""" | |
| 334 return self._orig_fp.tell() | |
| 335 | |
| 336 def seekable(self): # pylint: disable=invalid-name | |
| 337 """Returns true if the stream is seekable.""" | |
| 338 return self._orig_fp.seekable() | |
| 339 | |
| 340 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name | |
| 341 """Seeks in the wrapped file pointer and catches up hash digests. | |
| 342 | |
| 343 Args: | |
| 344 offset: The offset to seek to. | |
| 345 whence: os.SEEK_CUR, or SEEK_END, SEEK_SET. | |
| 346 | |
| 347 Returns: | |
| 348 Return value from the wrapped stream's seek call. | |
| 349 """ | |
| 350 if whence != os.SEEK_SET: | |
| 351 # We do not catch up hashes for non-absolute seeks, and rely on the | |
| 352 # caller to seek to an absolute position before reading. | |
| 353 self._seek_away = self._orig_fp.tell() | |
| 354 | |
| 355 else: | |
| 356 # Hashes will be correct and it's safe to call read(). | |
| 357 self._seek_away = None | |
| 358 if offset < self._digesters_previous_mark: | |
| 359 # This is earlier than our earliest saved digest, so we need to | |
| 360 # reset the digesters and scan from the beginning. | |
| 361 for alg in self._digesters: | |
| 362 self._digesters[alg] = self._hash_algs[alg]() | |
| 363 self._digesters_current_mark = 0 | |
| 364 self._orig_fp.seek(0) | |
| 365 self._CatchUp(offset) | |
| 366 | |
| 367 elif offset == self._digesters_previous_mark: | |
| 368 # Just load the saved digests. | |
| 369 self._digesters_current_mark = self._digesters_previous_mark | |
| 370 for alg in self._digesters: | |
| 371 self._digesters[alg] = self._digesters_previous[alg] | |
| 372 | |
| 373 elif offset < self._digesters_current_mark: | |
| 374 # Reset the position to our previous digest and scan forward. | |
| 375 self._digesters_current_mark = self._digesters_previous_mark | |
| 376 for alg in self._digesters: | |
| 377 self._digesters[alg] = self._digesters_previous[alg] | |
| 378 self._orig_fp.seek(self._digesters_previous_mark) | |
| 379 self._CatchUp(offset - self._digesters_previous_mark) | |
| 380 | |
| 381 else: | |
| 382 # Scan forward from our current digest and position. | |
| 383 self._orig_fp.seek(self._digesters_current_mark) | |
| 384 self._CatchUp(offset - self._digesters_current_mark) | |
| 385 | |
| 386 return self._orig_fp.seek(offset, whence) | |
| 387 | |
| 388 def _CatchUp(self, bytes_to_read): | |
| 389 """Catches up hashes, but does not return data and uses little memory. | |
| 390 | |
| 391 Before calling this function, digesters_current_mark should be updated | |
| 392 to the current location of the original stream and the self._digesters | |
| 393 should be current to that point (but no further). | |
| 394 | |
| 395 Args: | |
| 396 bytes_to_read: Number of bytes to catch up from the original stream. | |
| 397 """ | |
| 398 if self._orig_fp.tell() != self._digesters_current_mark: | |
| 399 raise CommandException( | |
| 400 'Invalid mark when catching up hashes. Stream position %s, hash ' | |
| 401 'position %s' % (self._orig_fp.tell(), self._digesters_current_mark)) | |
| 402 | |
| 403 for alg in self._digesters: | |
| 404 if bytes_to_read >= MIN_SIZE_COMPUTE_LOGGING: | |
| 405 self._logger.info('Catching up %s for %s...', alg, | |
| 406 self._src_url.url_string) | |
| 407 self._digesters_previous[alg] = self._digesters[alg].copy() | |
| 408 | |
| 409 self._digesters_previous_mark = self._digesters_current_mark | |
| 410 bytes_remaining = bytes_to_read | |
| 411 bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE) | |
| 412 while bytes_this_round: | |
| 413 data = self._orig_fp.read(bytes_this_round) | |
| 414 bytes_remaining -= bytes_this_round | |
| 415 for alg in self._digesters: | |
| 416 self._digesters[alg].update(data) | |
| 417 bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE) | |
| 418 self._digesters_current_mark += bytes_to_read | |
| OLD | NEW |