OLD | NEW |
(Empty) | |
| 1 # -*- coding: utf-8 -*- |
| 2 # Copyright 2014 Google Inc. All Rights Reserved. |
| 3 # |
| 4 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 # you may not use this file except in compliance with the License. |
| 6 # You may obtain a copy of the License at |
| 7 # |
| 8 # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 # |
| 10 # Unless required by applicable law or agreed to in writing, software |
| 11 # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 # See the License for the specific language governing permissions and |
| 14 # limitations under the License. |
| 15 """Helper functions for hashing functionality.""" |
| 16 |
| 17 import base64 |
| 18 import binascii |
| 19 from hashlib import md5 |
| 20 import os |
| 21 |
| 22 from boto import config |
| 23 import crcmod |
| 24 |
| 25 from gslib.exception import CommandException |
| 26 from gslib.util import DEFAULT_FILE_BUFFER_SIZE |
| 27 from gslib.util import MIN_SIZE_COMPUTE_LOGGING |
| 28 from gslib.util import TRANSFER_BUFFER_SIZE |
| 29 from gslib.util import UsingCrcmodExtension |
| 30 |
| 31 |
| 32 SLOW_CRCMOD_WARNING = """ |
| 33 WARNING: You have requested checksumming but your crcmod installation isn't |
| 34 using the module's C extension, so checksumming will run very slowly. For help |
| 35 installing the extension, please see: |
| 36 $ gsutil help crcmod |
| 37 """ |
| 38 |
| 39 |
| 40 _SLOW_CRCMOD_DOWNLOAD_WARNING = """ |
| 41 WARNING: Downloading this composite object requires integrity checking with |
| 42 CRC32c, but your crcmod installation isn't using the module's C extension, |
| 43 so the hash computation will likely throttle download performance. For help |
| 44 installing the extension, please see: |
| 45 $ gsutil help crcmod |
| 46 To disable slow integrity checking, see the "check_hashes" option in your |
| 47 boto config file. |
| 48 """ |
| 49 |
| 50 _SLOW_CRC_EXCEPTION_TEXT = """ |
| 51 Downloading this composite object requires integrity checking with CRC32c, |
| 52 but your crcmod installation isn't using the module's C extension, so the |
| 53 hash computation will likely throttle download performance. For help |
| 54 installing the extension, please see: |
| 55 |
| 56 $ gsutil help crcmod |
| 57 |
| 58 To download regardless of crcmod performance or to skip slow integrity |
| 59 checks, see the "check_hashes" option in your boto config file. |
| 60 |
| 61 NOTE: It is strongly recommended that you not disable integrity checks. Doing so |
| 62 could allow data corruption to go undetected during uploading/downloading.""" |
| 63 |
| 64 |
| 65 _NO_HASH_CHECK_WARNING = """ |
| 66 WARNING: This download will not be validated since your crcmod installation |
| 67 doesn't use the module's C extension, so the hash computation would likely |
| 68 throttle download performance. For help in installing the extension, please |
| 69 see: |
| 70 $ gsutil help crcmod |
| 71 To force integrity checking, see the "check_hashes" option in your boto config |
| 72 file. |
| 73 """ |
| 74 |
| 75 |
| 76 # Configuration values for hashing. |
| 77 CHECK_HASH_IF_FAST_ELSE_FAIL = 'if_fast_else_fail' |
| 78 CHECK_HASH_IF_FAST_ELSE_SKIP = 'if_fast_else_skip' |
| 79 CHECK_HASH_ALWAYS = 'always' |
| 80 CHECK_HASH_NEVER = 'never' |
| 81 |
| 82 |
| 83 def _CalculateHashFromContents(fp, hash_alg): |
| 84 """Calculates a base64 digest of the contents of a seekable stream. |
| 85 |
| 86 This function resets the file pointer to position 0. |
| 87 |
| 88 Args: |
| 89 fp: An already-open file object. |
| 90 hash_alg: Instance of hashing class initialized to start state. |
| 91 |
| 92 Returns: |
| 93 Hash of the stream in hex string format. |
| 94 """ |
| 95 hash_dict = {'placeholder': hash_alg} |
| 96 fp.seek(0) |
| 97 CalculateHashesFromContents(fp, hash_dict) |
| 98 fp.seek(0) |
| 99 return hash_dict['placeholder'].hexdigest() |
| 100 |
| 101 |
| 102 def CalculateHashesFromContents(fp, hash_dict, callback_processor=None): |
| 103 """Calculates hashes of the contents of a file. |
| 104 |
| 105 Args: |
| 106 fp: An already-open file object (stream will be consumed). |
| 107 hash_dict: Dict of (string alg_name: initialized hashing class) |
| 108 Hashing class will be populated with digests upon return. |
| 109 callback_processor: Optional callback processing class that implements |
| 110 Progress(integer amount of bytes processed). |
| 111 """ |
| 112 while True: |
| 113 data = fp.read(DEFAULT_FILE_BUFFER_SIZE) |
| 114 if not data: |
| 115 break |
| 116 for hash_alg in hash_dict.itervalues(): |
| 117 hash_alg.update(data) |
| 118 if callback_processor: |
| 119 callback_processor.Progress(len(data)) |
| 120 |
| 121 |
| 122 def CalculateB64EncodedCrc32cFromContents(fp): |
| 123 """Calculates a base64 CRC32c checksum of the contents of a seekable stream. |
| 124 |
| 125 This function sets the stream position 0 before and after calculation. |
| 126 |
| 127 Args: |
| 128 fp: An already-open file object. |
| 129 |
| 130 Returns: |
| 131 CRC32c checksum of the file in base64 format. |
| 132 """ |
| 133 return _CalculateB64EncodedHashFromContents( |
| 134 fp, crcmod.predefined.Crc('crc-32c')) |
| 135 |
| 136 |
| 137 def CalculateB64EncodedMd5FromContents(fp): |
| 138 """Calculates a base64 MD5 digest of the contents of a seekable stream. |
| 139 |
| 140 This function sets the stream position 0 before and after calculation. |
| 141 |
| 142 Args: |
| 143 fp: An already-open file object. |
| 144 |
| 145 Returns: |
| 146 MD5 digest of the file in base64 format. |
| 147 """ |
| 148 return _CalculateB64EncodedHashFromContents(fp, md5()) |
| 149 |
| 150 |
| 151 def CalculateMd5FromContents(fp): |
| 152 """Calculates a base64 MD5 digest of the contents of a seekable stream. |
| 153 |
| 154 This function sets the stream position 0 before and after calculation. |
| 155 |
| 156 Args: |
| 157 fp: An already-open file object. |
| 158 |
| 159 Returns: |
| 160 MD5 digest of the file in hex format. |
| 161 """ |
| 162 return _CalculateHashFromContents(fp, md5()) |
| 163 |
| 164 |
| 165 def Base64EncodeHash(digest_value): |
| 166 """Returns the base64-encoded version of the input hex digest value.""" |
| 167 return base64.encodestring(binascii.unhexlify(digest_value)).rstrip('\n') |
| 168 |
| 169 |
| 170 def Base64ToHexHash(base64_hash): |
| 171 """Returns the hex digest value of the input base64-encoded hash. |
| 172 |
| 173 Args: |
| 174 base64_hash: Base64-encoded hash, which may contain newlines and single or |
| 175 double quotes. |
| 176 |
| 177 Returns: |
| 178 Hex digest of the input argument. |
| 179 """ |
| 180 return binascii.hexlify(base64.decodestring(base64_hash.strip('\n"\''))) |
| 181 |
| 182 |
| 183 def _CalculateB64EncodedHashFromContents(fp, hash_alg): |
| 184 """Calculates a base64 digest of the contents of a seekable stream. |
| 185 |
| 186 This function sets the stream position 0 before and after calculation. |
| 187 |
| 188 Args: |
| 189 fp: An already-open file object. |
| 190 hash_alg: Instance of hashing class initialized to start state. |
| 191 |
| 192 Returns: |
| 193 Hash of the stream in base64 format. |
| 194 """ |
| 195 return Base64EncodeHash(_CalculateHashFromContents(fp, hash_alg)) |
| 196 |
| 197 |
| 198 def GetUploadHashAlgs(): |
| 199 """Returns a dict of hash algorithms for validating an uploaded object. |
| 200 |
| 201 This is for use only with single object uploads, not compose operations |
| 202 such as those used by parallel composite uploads (though it can be used to |
| 203 validate the individual components). |
| 204 |
| 205 Returns: |
| 206 dict of (algorithm_name: hash_algorithm) |
| 207 """ |
| 208 check_hashes_config = config.get( |
| 209 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL) |
| 210 if check_hashes_config == 'never': |
| 211 return {} |
| 212 return {'md5': md5} |
| 213 |
| 214 |
| 215 def GetDownloadHashAlgs(logger, src_has_md5=False, src_has_crc32c=False): |
| 216 """Returns a dict of hash algorithms for validating an object. |
| 217 |
| 218 Args: |
| 219 logger: logging.Logger for outputting log messages. |
| 220 src_has_md5: If True, source object has an md5 hash. |
| 221 src_has_crc32c: If True, source object has a crc32c hash. |
| 222 |
| 223 Returns: |
| 224 Dict of (string, hash algorithm). |
| 225 |
| 226 Raises: |
| 227 CommandException if hash algorithms satisfying the boto config file |
| 228 cannot be returned. |
| 229 """ |
| 230 check_hashes_config = config.get( |
| 231 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL) |
| 232 if check_hashes_config == CHECK_HASH_NEVER: |
| 233 return {} |
| 234 |
| 235 hash_algs = {} |
| 236 if src_has_md5: |
| 237 hash_algs['md5'] = md5 |
| 238 elif src_has_crc32c: |
| 239 # If the cloud provider supplies a CRC, we'll compute a checksum to |
| 240 # validate if we're using a native crcmod installation and MD5 isn't |
| 241 # offered as an alternative. |
| 242 if UsingCrcmodExtension(crcmod): |
| 243 hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c') |
| 244 elif not hash_algs: |
| 245 if check_hashes_config == CHECK_HASH_IF_FAST_ELSE_FAIL: |
| 246 raise CommandException(_SLOW_CRC_EXCEPTION_TEXT) |
| 247 elif check_hashes_config == CHECK_HASH_IF_FAST_ELSE_SKIP: |
| 248 logger.warn(_NO_HASH_CHECK_WARNING) |
| 249 elif check_hashes_config == CHECK_HASH_ALWAYS: |
| 250 logger.warn(_SLOW_CRCMOD_DOWNLOAD_WARNING) |
| 251 hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c') |
| 252 else: |
| 253 raise CommandException( |
| 254 'Your boto config \'check_hashes\' option is misconfigured.') |
| 255 |
| 256 return hash_algs |
| 257 |
| 258 |
| 259 class HashingFileUploadWrapper(object): |
| 260 """Wraps an input stream in a hash digester and exposes a stream interface. |
| 261 |
| 262 This class provides integrity checking during file uploads via the |
| 263 following properties: |
| 264 |
| 265 Calls to read will appropriately update digesters with all bytes read. |
| 266 Calls to seek (assuming it is supported by the wrapped stream) using |
| 267 os.SEEK_SET will catch up / reset the digesters to the specified |
| 268 position. If seek is called with a different os.SEEK mode, the caller |
| 269 must return to the original position using os.SEEK_SET before further |
| 270 reads. |
| 271 Calls to seek are fast if the desired position is equal to the position at |
| 272 the beginning of the last read call (we only need to re-hash bytes |
| 273 from that point on). |
| 274 """ |
| 275 |
| 276 def __init__(self, stream, digesters, hash_algs, src_url, logger): |
| 277 """Initializes the wrapper. |
| 278 |
| 279 Args: |
| 280 stream: Input stream. |
| 281 digesters: dict of {string: hash digester} containing digesters, where |
| 282 string is the name of the hash algorithm. |
| 283 hash_algs: dict of {string: hash algorithm} for resetting and |
| 284 recalculating digesters. String is the name of the hash algorithm. |
| 285 src_url: Source FileUrl that is being copied. |
| 286 logger: For outputting log messages. |
| 287 """ |
| 288 if not digesters: |
| 289 raise CommandException('HashingFileUploadWrapper used with no digesters.') |
| 290 elif not hash_algs: |
| 291 raise CommandException('HashingFileUploadWrapper used with no hash_algs.') |
| 292 |
| 293 self._orig_fp = stream |
| 294 self._digesters = digesters |
| 295 self._src_url = src_url |
| 296 self._logger = logger |
| 297 self._seek_away = None |
| 298 |
| 299 self._digesters_previous = {} |
| 300 for alg in self._digesters: |
| 301 self._digesters_previous[alg] = self._digesters[alg].copy() |
| 302 self._digesters_previous_mark = 0 |
| 303 self._digesters_current_mark = 0 |
| 304 self._hash_algs = hash_algs |
| 305 |
| 306 def read(self, size=-1): # pylint: disable=invalid-name |
| 307 """"Reads from the wrapped file pointer and calculates hash digests. |
| 308 |
| 309 Args: |
| 310 size: The amount of bytes to read. If ommited or negative, the entire |
| 311 contents of the file will be read, hashed, and returned. |
| 312 |
| 313 Returns: |
| 314 Bytes from the wrapped stream. |
| 315 |
| 316 Raises: |
| 317 CommandException if the position of the wrapped stream is unknown. |
| 318 """ |
| 319 if self._seek_away is not None: |
| 320 raise CommandException('Read called on hashing file pointer in an ' |
| 321 'unknown position; cannot correctly compute ' |
| 322 'digest.') |
| 323 |
| 324 data = self._orig_fp.read(size) |
| 325 self._digesters_previous_mark = self._digesters_current_mark |
| 326 for alg in self._digesters: |
| 327 self._digesters_previous[alg] = self._digesters[alg].copy() |
| 328 self._digesters[alg].update(data) |
| 329 self._digesters_current_mark += len(data) |
| 330 return data |
| 331 |
| 332 def tell(self): # pylint: disable=invalid-name |
| 333 """Returns the current stream position.""" |
| 334 return self._orig_fp.tell() |
| 335 |
| 336 def seekable(self): # pylint: disable=invalid-name |
| 337 """Returns true if the stream is seekable.""" |
| 338 return self._orig_fp.seekable() |
| 339 |
| 340 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name |
| 341 """Seeks in the wrapped file pointer and catches up hash digests. |
| 342 |
| 343 Args: |
| 344 offset: The offset to seek to. |
| 345 whence: os.SEEK_CUR, or SEEK_END, SEEK_SET. |
| 346 |
| 347 Returns: |
| 348 Return value from the wrapped stream's seek call. |
| 349 """ |
| 350 if whence != os.SEEK_SET: |
| 351 # We do not catch up hashes for non-absolute seeks, and rely on the |
| 352 # caller to seek to an absolute position before reading. |
| 353 self._seek_away = self._orig_fp.tell() |
| 354 |
| 355 else: |
| 356 # Hashes will be correct and it's safe to call read(). |
| 357 self._seek_away = None |
| 358 if offset < self._digesters_previous_mark: |
| 359 # This is earlier than our earliest saved digest, so we need to |
| 360 # reset the digesters and scan from the beginning. |
| 361 for alg in self._digesters: |
| 362 self._digesters[alg] = self._hash_algs[alg]() |
| 363 self._digesters_current_mark = 0 |
| 364 self._orig_fp.seek(0) |
| 365 self._CatchUp(offset) |
| 366 |
| 367 elif offset == self._digesters_previous_mark: |
| 368 # Just load the saved digests. |
| 369 self._digesters_current_mark = self._digesters_previous_mark |
| 370 for alg in self._digesters: |
| 371 self._digesters[alg] = self._digesters_previous[alg] |
| 372 |
| 373 elif offset < self._digesters_current_mark: |
| 374 # Reset the position to our previous digest and scan forward. |
| 375 self._digesters_current_mark = self._digesters_previous_mark |
| 376 for alg in self._digesters: |
| 377 self._digesters[alg] = self._digesters_previous[alg] |
| 378 self._orig_fp.seek(self._digesters_previous_mark) |
| 379 self._CatchUp(offset - self._digesters_previous_mark) |
| 380 |
| 381 else: |
| 382 # Scan forward from our current digest and position. |
| 383 self._orig_fp.seek(self._digesters_current_mark) |
| 384 self._CatchUp(offset - self._digesters_current_mark) |
| 385 |
| 386 return self._orig_fp.seek(offset, whence) |
| 387 |
| 388 def _CatchUp(self, bytes_to_read): |
| 389 """Catches up hashes, but does not return data and uses little memory. |
| 390 |
| 391 Before calling this function, digesters_current_mark should be updated |
| 392 to the current location of the original stream and the self._digesters |
| 393 should be current to that point (but no further). |
| 394 |
| 395 Args: |
| 396 bytes_to_read: Number of bytes to catch up from the original stream. |
| 397 """ |
| 398 if self._orig_fp.tell() != self._digesters_current_mark: |
| 399 raise CommandException( |
| 400 'Invalid mark when catching up hashes. Stream position %s, hash ' |
| 401 'position %s' % (self._orig_fp.tell(), self._digesters_current_mark)) |
| 402 |
| 403 for alg in self._digesters: |
| 404 if bytes_to_read >= MIN_SIZE_COMPUTE_LOGGING: |
| 405 self._logger.info('Catching up %s for %s...', alg, |
| 406 self._src_url.url_string) |
| 407 self._digesters_previous[alg] = self._digesters[alg].copy() |
| 408 |
| 409 self._digesters_previous_mark = self._digesters_current_mark |
| 410 bytes_remaining = bytes_to_read |
| 411 bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE) |
| 412 while bytes_this_round: |
| 413 data = self._orig_fp.read(bytes_this_round) |
| 414 bytes_remaining -= bytes_this_round |
| 415 for alg in self._digesters: |
| 416 self._digesters[alg].update(data) |
| 417 bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE) |
| 418 self._digesters_current_mark += bytes_to_read |
OLD | NEW |