| OLD | NEW |
| (Empty) | |
| 1 # -*- coding: utf-8 -*- |
| 2 # Copyright 2014 Google Inc. All Rights Reserved. |
| 3 # |
| 4 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 # you may not use this file except in compliance with the License. |
| 6 # You may obtain a copy of the License at |
| 7 # |
| 8 # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 # |
| 10 # Unless required by applicable law or agreed to in writing, software |
| 11 # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 # See the License for the specific language governing permissions and |
| 14 # limitations under the License. |
| 15 """Helper functions for hashing functionality.""" |
| 16 |
| 17 import base64 |
| 18 import binascii |
| 19 from hashlib import md5 |
| 20 import os |
| 21 |
| 22 from boto import config |
| 23 import crcmod |
| 24 |
| 25 from gslib.commands.config import CHECK_HASH_ALWAYS |
| 26 from gslib.commands.config import CHECK_HASH_IF_FAST_ELSE_FAIL |
| 27 from gslib.commands.config import CHECK_HASH_IF_FAST_ELSE_SKIP |
| 28 from gslib.commands.config import CHECK_HASH_NEVER |
| 29 from gslib.exception import CommandException |
| 30 from gslib.util import DEFAULT_FILE_BUFFER_SIZE |
| 31 from gslib.util import MIN_SIZE_COMPUTE_LOGGING |
| 32 from gslib.util import TRANSFER_BUFFER_SIZE |
| 33 from gslib.util import UsingCrcmodExtension |
| 34 |
| 35 |
| 36 SLOW_CRCMOD_WARNING = """ |
| 37 WARNING: You have requested checksumming but your crcmod installation isn't |
| 38 using the module's C extension, so checksumming will run very slowly. For help |
| 39 installing the extension, please see: |
| 40 $ gsutil help crcmod |
| 41 """ |
| 42 |
| 43 |
| 44 _SLOW_CRCMOD_DOWNLOAD_WARNING = """ |
| 45 WARNING: Downloading this composite object requires integrity checking with |
| 46 CRC32c, but your crcmod installation isn't using the module's C extension, |
| 47 so the hash computation will likely throttle download performance. For help |
| 48 installing the extension, please see: |
| 49 $ gsutil help crcmod |
| 50 To disable slow integrity checking, see the "check_hashes" option in your |
| 51 boto config file. |
| 52 """ |
| 53 |
| 54 _SLOW_CRC_EXCEPTION_TEXT = """ |
| 55 Downloading this composite object requires integrity checking with CRC32c, |
| 56 but your crcmod installation isn't using the module's C extension, so the |
| 57 hash computation will likely throttle download performance. For help |
| 58 installing the extension, please see: |
| 59 |
| 60 $ gsutil help crcmod |
| 61 |
| 62 To download regardless of crcmod performance or to skip slow integrity |
| 63 checks, see the "check_hashes" option in your boto config file. |
| 64 |
| 65 NOTE: It is strongly recommended that you not disable integrity checks. Doing so |
| 66 could allow data corruption to go undetected during uploading/downloading.""" |
| 67 |
| 68 |
| 69 _NO_HASH_CHECK_WARNING = """ |
| 70 WARNING: This download will not be validated since your crcmod installation |
| 71 doesn't use the module's C extension, so the hash computation would likely |
| 72 throttle download performance. For help in installing the extension, please |
| 73 see: |
| 74 $ gsutil help crcmod |
| 75 To force integrity checking, see the "check_hashes" option in your boto config |
| 76 file. |
| 77 """ |
| 78 |
| 79 |
| 80 def _CalculateHashFromContents(fp, hash_alg): |
| 81 """Calculates a base64 digest of the contents of a seekable stream. |
| 82 |
| 83 This function resets the file pointer to position 0. |
| 84 |
| 85 Args: |
| 86 fp: An already-open file object. |
| 87 hash_alg: Instance of hashing class initialized to start state. |
| 88 |
| 89 Returns: |
| 90 Hash of the stream in hex string format. |
| 91 """ |
| 92 hash_dict = {'placeholder': hash_alg} |
| 93 fp.seek(0) |
| 94 CalculateHashesFromContents(fp, hash_dict) |
| 95 fp.seek(0) |
| 96 return hash_dict['placeholder'].hexdigest() |
| 97 |
| 98 |
| 99 def CalculateHashesFromContents(fp, hash_dict, callback_processor=None): |
| 100 """Calculates hashes of the contents of a file. |
| 101 |
| 102 Args: |
| 103 fp: An already-open file object (stream will be consumed). |
| 104 hash_dict: Dict of (string alg_name: initialized hashing class) |
| 105 Hashing class will be populated with digests upon return. |
| 106 callback_processor: Optional callback processing class that implements |
| 107 Progress(integer amount of bytes processed). |
| 108 """ |
| 109 while True: |
| 110 data = fp.read(DEFAULT_FILE_BUFFER_SIZE) |
| 111 if not data: |
| 112 break |
| 113 for hash_alg in hash_dict.itervalues(): |
| 114 hash_alg.update(data) |
| 115 if callback_processor: |
| 116 callback_processor.Progress(len(data)) |
| 117 |
| 118 |
| 119 def CalculateB64EncodedCrc32cFromContents(fp): |
| 120 """Calculates a base64 CRC32c checksum of the contents of a seekable stream. |
| 121 |
| 122 This function sets the stream position 0 before and after calculation. |
| 123 |
| 124 Args: |
| 125 fp: An already-open file object. |
| 126 |
| 127 Returns: |
| 128 CRC32c checksum of the file in base64 format. |
| 129 """ |
| 130 return _CalculateB64EncodedHashFromContents( |
| 131 fp, crcmod.predefined.Crc('crc-32c')) |
| 132 |
| 133 |
| 134 def CalculateB64EncodedMd5FromContents(fp): |
| 135 """Calculates a base64 MD5 digest of the contents of a seekable stream. |
| 136 |
| 137 This function sets the stream position 0 before and after calculation. |
| 138 |
| 139 Args: |
| 140 fp: An already-open file object. |
| 141 |
| 142 Returns: |
| 143 MD5 digest of the file in base64 format. |
| 144 """ |
| 145 return _CalculateB64EncodedHashFromContents(fp, md5()) |
| 146 |
| 147 |
| 148 def CalculateMd5FromContents(fp): |
| 149 """Calculates a base64 MD5 digest of the contents of a seekable stream. |
| 150 |
| 151 This function sets the stream position 0 before and after calculation. |
| 152 |
| 153 Args: |
| 154 fp: An already-open file object. |
| 155 |
| 156 Returns: |
| 157 MD5 digest of the file in hex format. |
| 158 """ |
| 159 return _CalculateHashFromContents(fp, md5()) |
| 160 |
| 161 |
| 162 def Base64EncodeHash(digest_value): |
| 163 """Returns the base64-encoded version of the input hex digest value.""" |
| 164 return base64.encodestring(binascii.unhexlify(digest_value)).rstrip('\n') |
| 165 |
| 166 |
| 167 def _CalculateB64EncodedHashFromContents(fp, hash_alg): |
| 168 """Calculates a base64 digest of the contents of a seekable stream. |
| 169 |
| 170 This function sets the stream position 0 before and after calculation. |
| 171 |
| 172 Args: |
| 173 fp: An already-open file object. |
| 174 hash_alg: Instance of hashing class initialized to start state. |
| 175 |
| 176 Returns: |
| 177 Hash of the stream in base64 format. |
| 178 """ |
| 179 return Base64EncodeHash(_CalculateHashFromContents(fp, hash_alg)) |
| 180 |
| 181 |
| 182 def GetUploadHashAlgs(): |
| 183 """Returns a dict of hash algorithms for validating an uploaded object. |
| 184 |
| 185 This is for use only with single object uploads, not compose operations |
| 186 such as those used by parallel composite uploads (though it can be used to |
| 187 validate the individual components). |
| 188 |
| 189 Returns: |
| 190 dict of (algorithm_name: hash_algorithm) |
| 191 """ |
| 192 check_hashes_config = config.get( |
| 193 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL) |
| 194 if check_hashes_config == 'never': |
| 195 return {} |
| 196 return {'md5': md5} |
| 197 |
| 198 |
| 199 def GetDownloadHashAlgs(logger, src_has_md5=False, src_has_crc32c=False): |
| 200 """Returns a dict of hash algorithms for validating an object. |
| 201 |
| 202 Args: |
| 203 logger: logging.Logger for outputting log messages. |
| 204 src_has_md5: If True, source object has an md5 hash. |
| 205 src_has_crc32c: If True, source object has a crc32c hash. |
| 206 |
| 207 Returns: |
| 208 Dict of (string, hash algorithm). |
| 209 |
| 210 Raises: |
| 211 CommandException if hash algorithms satisfying the boto config file |
| 212 cannot be returned. |
| 213 """ |
| 214 check_hashes_config = config.get( |
| 215 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL) |
| 216 if check_hashes_config == CHECK_HASH_NEVER: |
| 217 return {} |
| 218 |
| 219 hash_algs = {} |
| 220 if src_has_md5: |
| 221 hash_algs['md5'] = md5 |
| 222 elif src_has_crc32c: |
| 223 # If the cloud provider supplies a CRC, we'll compute a checksum to |
| 224 # validate if we're using a native crcmod installation and MD5 isn't |
| 225 # offered as an alternative. |
| 226 if UsingCrcmodExtension(crcmod): |
| 227 hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c') |
| 228 elif not hash_algs: |
| 229 if check_hashes_config == CHECK_HASH_IF_FAST_ELSE_FAIL: |
| 230 raise CommandException(_SLOW_CRC_EXCEPTION_TEXT) |
| 231 elif check_hashes_config == CHECK_HASH_IF_FAST_ELSE_SKIP: |
| 232 logger.warn(_NO_HASH_CHECK_WARNING) |
| 233 elif check_hashes_config == CHECK_HASH_ALWAYS: |
| 234 logger.warn(_SLOW_CRCMOD_DOWNLOAD_WARNING) |
| 235 hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c') |
| 236 else: |
| 237 raise CommandException( |
| 238 'Your boto config \'check_hashes\' option is misconfigured.') |
| 239 |
| 240 return hash_algs |
| 241 |
| 242 |
| 243 class HashingFileUploadWrapper(object): |
| 244 """Wraps an input stream in a hash digester and exposes a stream interface. |
| 245 |
| 246 This class provides integrity checking during file uploads via the |
| 247 following properties: |
| 248 |
| 249 Calls to read will appropriately update digesters with all bytes read. |
| 250 Calls to seek (assuming it is supported by the wrapped stream) using |
| 251 os.SEEK_SET will catch up / reset the digesters to the specified |
| 252 position. If seek is called with a different os.SEEK mode, the caller |
| 253 must return to the original position using os.SEEK_SET before further |
| 254 reads. |
| 255 Calls to seek are fast if the desired position is equal to the position at |
| 256 the beginning of the last read call (we only need to re-hash bytes |
| 257 from that point on). |
| 258 """ |
| 259 |
| 260 def __init__(self, stream, digesters, hash_algs, src_url, logger): |
| 261 """Initializes the wrapper. |
| 262 |
| 263 Args: |
| 264 stream: Input stream. |
| 265 digesters: dict of {string: hash digester} containing digesters, where |
| 266 string is the name of the hash algorithm. |
| 267 hash_algs: dict of {string: hash algorithm} for resetting and |
| 268 recalculating digesters. String is the name of the hash algorithm. |
| 269 src_url: Source FileUrl that is being copied. |
| 270 logger: For outputting log messages. |
| 271 """ |
| 272 if not digesters: |
| 273 raise CommandException('HashingFileUploadWrapper used with no digesters.') |
| 274 elif not hash_algs: |
| 275 raise CommandException('HashingFileUploadWrapper used with no hash_algs.') |
| 276 |
| 277 self._orig_fp = stream |
| 278 self._digesters = digesters |
| 279 self._src_url = src_url |
| 280 self._logger = logger |
| 281 self._seek_away = None |
| 282 |
| 283 self._digesters_previous = {} |
| 284 for alg in self._digesters: |
| 285 self._digesters_previous[alg] = self._digesters[alg].copy() |
| 286 self._digesters_previous_mark = 0 |
| 287 self._digesters_current_mark = 0 |
| 288 self._hash_algs = hash_algs |
| 289 |
| 290 def read(self, size=-1): # pylint: disable=invalid-name |
| 291 """"Reads from the wrapped file pointer and calculates hash digests. |
| 292 |
| 293 Args: |
| 294 size: The amount of bytes to read. If ommited or negative, the entire |
| 295 contents of the file will be read, hashed, and returned. |
| 296 |
| 297 Returns: |
| 298 Bytes from the wrapped stream. |
| 299 |
| 300 Raises: |
| 301 CommandException if the position of the wrapped stream is unknown. |
| 302 """ |
| 303 if self._seek_away is not None: |
| 304 raise CommandException('Read called on hashing file pointer in an ' |
| 305 'unknown position; cannot correctly compute ' |
| 306 'digest.') |
| 307 |
| 308 data = self._orig_fp.read(size) |
| 309 self._digesters_previous_mark = self._digesters_current_mark |
| 310 for alg in self._digesters: |
| 311 self._digesters_previous[alg] = self._digesters[alg].copy() |
| 312 if len(data) >= MIN_SIZE_COMPUTE_LOGGING: |
| 313 self._logger.info('Catching up %s for %s...', alg, |
| 314 self._src_url.url_string) |
| 315 self._digesters[alg].update(data) |
| 316 self._digesters_current_mark += len(data) |
| 317 return data |
| 318 |
| 319 def tell(self): # pylint: disable=invalid-name |
| 320 """Returns the current stream position.""" |
| 321 return self._orig_fp.tell() |
| 322 |
| 323 def seekable(self): # pylint: disable=invalid-name |
| 324 """Returns true if the stream is seekable.""" |
| 325 return self._orig_fp.seekable() |
| 326 |
| 327 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name |
| 328 """Seeks in the wrapped file pointer and catches up hash digests. |
| 329 |
| 330 Args: |
| 331 offset: The offset to seek to. |
| 332 whence: os.SEEK_CUR, or SEEK_END, SEEK_SET. |
| 333 |
| 334 Returns: |
| 335 Return value from the wrapped stream's seek call. |
| 336 """ |
| 337 if whence != os.SEEK_SET: |
| 338 # We do not catch up hashes for non-absolute seeks, and rely on the |
| 339 # caller to seek to an absolute position before reading. |
| 340 self._seek_away = self._orig_fp.tell() |
| 341 |
| 342 else: |
| 343 # Hashes will be correct and it's safe to call read(). |
| 344 self._seek_away = None |
| 345 if offset < self._digesters_previous_mark: |
| 346 # This is earlier than our earliest saved digest, so we need to |
| 347 # reset the digesters and scan from the beginning. |
| 348 for alg in self._digesters: |
| 349 self._digesters[alg] = self._hash_algs[alg]() |
| 350 self._digesters_current_mark = 0 |
| 351 self._orig_fp.seek(0) |
| 352 self._CatchUp(offset) |
| 353 |
| 354 elif offset == self._digesters_previous_mark: |
| 355 # Just load the saved digests. |
| 356 self._digesters_current_mark = self._digesters_previous_mark |
| 357 for alg in self._digesters: |
| 358 self._digesters[alg] = self._digesters_previous[alg] |
| 359 |
| 360 elif offset < self._digesters_current_mark: |
| 361 # Reset the position to our previous digest and scan forward. |
| 362 self._digesters_current_mark = self._digesters_previous_mark |
| 363 for alg in self._digesters: |
| 364 self._digesters[alg] = self._digesters_previous[alg] |
| 365 self._orig_fp.seek(self._digesters_previous_mark) |
| 366 self._CatchUp(offset - self._digesters_previous_mark) |
| 367 |
| 368 else: |
| 369 # Scan forward from our current digest and position. |
| 370 self._orig_fp.seek(self._digesters_current_mark) |
| 371 self._CatchUp(offset - self._digesters_current_mark) |
| 372 |
| 373 return self._orig_fp.seek(offset, whence) |
| 374 |
| 375 def _CatchUp(self, bytes_to_read): |
| 376 """Catches up hashes, but does not return data and uses little memory. |
| 377 |
| 378 Before calling this function, digesters_current_mark should be updated |
| 379 to the current location of the original stream and the self._digesters |
| 380 should be current to that point (but no further). |
| 381 |
| 382 Args: |
| 383 bytes_to_read: Number of bytes to catch up from the original stream. |
| 384 """ |
| 385 if self._orig_fp.tell() != self._digesters_current_mark: |
| 386 raise CommandException( |
| 387 'Invalid mark when catching up hashes. Stream position %s, hash ' |
| 388 'position %s' % (self._orig_fp.tell(), self._digesters_current_mark)) |
| 389 |
| 390 for alg in self._digesters: |
| 391 if bytes_to_read >= MIN_SIZE_COMPUTE_LOGGING: |
| 392 self._logger.info('Catching up %s for %s...', alg, |
| 393 self._src_url.url_string) |
| 394 self._digesters_previous[alg] = self._digesters[alg].copy() |
| 395 |
| 396 self._digesters_previous_mark = self._digesters_current_mark |
| 397 bytes_remaining = bytes_to_read |
| 398 bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE) |
| 399 while bytes_this_round: |
| 400 data = self._orig_fp.read(bytes_this_round) |
| 401 bytes_remaining -= bytes_this_round |
| 402 for alg in self._digesters: |
| 403 self._digesters[alg].update(data) |
| 404 bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE) |
| 405 self._digesters_current_mark += bytes_to_read |
| OLD | NEW |