Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(117)

Side by Side Diff: third_party/gsutil/gslib/hashing_helper.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « third_party/gsutil/gslib/gcs_json_media.py ('k') | third_party/gsutil/gslib/help_provider.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # -*- coding: utf-8 -*-
2 # Copyright 2014 Google Inc. All Rights Reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Helper functions for hashing functionality."""
16
17 import base64
18 import binascii
19 from hashlib import md5
20 import os
21
22 from boto import config
23 import crcmod
24
25 from gslib.exception import CommandException
26 from gslib.util import DEFAULT_FILE_BUFFER_SIZE
27 from gslib.util import MIN_SIZE_COMPUTE_LOGGING
28 from gslib.util import TRANSFER_BUFFER_SIZE
29 from gslib.util import UsingCrcmodExtension
30
31
32 SLOW_CRCMOD_WARNING = """
33 WARNING: You have requested checksumming but your crcmod installation isn't
34 using the module's C extension, so checksumming will run very slowly. For help
35 installing the extension, please see:
36 $ gsutil help crcmod
37 """
38
39
40 _SLOW_CRCMOD_DOWNLOAD_WARNING = """
41 WARNING: Downloading this composite object requires integrity checking with
42 CRC32c, but your crcmod installation isn't using the module's C extension,
43 so the hash computation will likely throttle download performance. For help
44 installing the extension, please see:
45 $ gsutil help crcmod
46 To disable slow integrity checking, see the "check_hashes" option in your
47 boto config file.
48 """
49
50 _SLOW_CRC_EXCEPTION_TEXT = """
51 Downloading this composite object requires integrity checking with CRC32c,
52 but your crcmod installation isn't using the module's C extension, so the
53 hash computation will likely throttle download performance. For help
54 installing the extension, please see:
55
56 $ gsutil help crcmod
57
58 To download regardless of crcmod performance or to skip slow integrity
59 checks, see the "check_hashes" option in your boto config file.
60
61 NOTE: It is strongly recommended that you not disable integrity checks. Doing so
62 could allow data corruption to go undetected during uploading/downloading."""
63
64
65 _NO_HASH_CHECK_WARNING = """
66 WARNING: This download will not be validated since your crcmod installation
67 doesn't use the module's C extension, so the hash computation would likely
68 throttle download performance. For help in installing the extension, please
69 see:
70 $ gsutil help crcmod
71 To force integrity checking, see the "check_hashes" option in your boto config
72 file.
73 """
74
75
76 # Configuration values for hashing.
77 CHECK_HASH_IF_FAST_ELSE_FAIL = 'if_fast_else_fail'
78 CHECK_HASH_IF_FAST_ELSE_SKIP = 'if_fast_else_skip'
79 CHECK_HASH_ALWAYS = 'always'
80 CHECK_HASH_NEVER = 'never'
81
82
83 def _CalculateHashFromContents(fp, hash_alg):
84 """Calculates a base64 digest of the contents of a seekable stream.
85
86 This function resets the file pointer to position 0.
87
88 Args:
89 fp: An already-open file object.
90 hash_alg: Instance of hashing class initialized to start state.
91
92 Returns:
93 Hash of the stream in hex string format.
94 """
95 hash_dict = {'placeholder': hash_alg}
96 fp.seek(0)
97 CalculateHashesFromContents(fp, hash_dict)
98 fp.seek(0)
99 return hash_dict['placeholder'].hexdigest()
100
101
102 def CalculateHashesFromContents(fp, hash_dict, callback_processor=None):
103 """Calculates hashes of the contents of a file.
104
105 Args:
106 fp: An already-open file object (stream will be consumed).
107 hash_dict: Dict of (string alg_name: initialized hashing class)
108 Hashing class will be populated with digests upon return.
109 callback_processor: Optional callback processing class that implements
110 Progress(integer amount of bytes processed).
111 """
112 while True:
113 data = fp.read(DEFAULT_FILE_BUFFER_SIZE)
114 if not data:
115 break
116 for hash_alg in hash_dict.itervalues():
117 hash_alg.update(data)
118 if callback_processor:
119 callback_processor.Progress(len(data))
120
121
122 def CalculateB64EncodedCrc32cFromContents(fp):
123 """Calculates a base64 CRC32c checksum of the contents of a seekable stream.
124
125 This function sets the stream position 0 before and after calculation.
126
127 Args:
128 fp: An already-open file object.
129
130 Returns:
131 CRC32c checksum of the file in base64 format.
132 """
133 return _CalculateB64EncodedHashFromContents(
134 fp, crcmod.predefined.Crc('crc-32c'))
135
136
137 def CalculateB64EncodedMd5FromContents(fp):
138 """Calculates a base64 MD5 digest of the contents of a seekable stream.
139
140 This function sets the stream position 0 before and after calculation.
141
142 Args:
143 fp: An already-open file object.
144
145 Returns:
146 MD5 digest of the file in base64 format.
147 """
148 return _CalculateB64EncodedHashFromContents(fp, md5())
149
150
151 def CalculateMd5FromContents(fp):
152 """Calculates a base64 MD5 digest of the contents of a seekable stream.
153
154 This function sets the stream position 0 before and after calculation.
155
156 Args:
157 fp: An already-open file object.
158
159 Returns:
160 MD5 digest of the file in hex format.
161 """
162 return _CalculateHashFromContents(fp, md5())
163
164
165 def Base64EncodeHash(digest_value):
166 """Returns the base64-encoded version of the input hex digest value."""
167 return base64.encodestring(binascii.unhexlify(digest_value)).rstrip('\n')
168
169
170 def Base64ToHexHash(base64_hash):
171 """Returns the hex digest value of the input base64-encoded hash.
172
173 Args:
174 base64_hash: Base64-encoded hash, which may contain newlines and single or
175 double quotes.
176
177 Returns:
178 Hex digest of the input argument.
179 """
180 return binascii.hexlify(base64.decodestring(base64_hash.strip('\n"\'')))
181
182
183 def _CalculateB64EncodedHashFromContents(fp, hash_alg):
184 """Calculates a base64 digest of the contents of a seekable stream.
185
186 This function sets the stream position 0 before and after calculation.
187
188 Args:
189 fp: An already-open file object.
190 hash_alg: Instance of hashing class initialized to start state.
191
192 Returns:
193 Hash of the stream in base64 format.
194 """
195 return Base64EncodeHash(_CalculateHashFromContents(fp, hash_alg))
196
197
198 def GetUploadHashAlgs():
199 """Returns a dict of hash algorithms for validating an uploaded object.
200
201 This is for use only with single object uploads, not compose operations
202 such as those used by parallel composite uploads (though it can be used to
203 validate the individual components).
204
205 Returns:
206 dict of (algorithm_name: hash_algorithm)
207 """
208 check_hashes_config = config.get(
209 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL)
210 if check_hashes_config == 'never':
211 return {}
212 return {'md5': md5}
213
214
215 def GetDownloadHashAlgs(logger, src_has_md5=False, src_has_crc32c=False):
216 """Returns a dict of hash algorithms for validating an object.
217
218 Args:
219 logger: logging.Logger for outputting log messages.
220 src_has_md5: If True, source object has an md5 hash.
221 src_has_crc32c: If True, source object has a crc32c hash.
222
223 Returns:
224 Dict of (string, hash algorithm).
225
226 Raises:
227 CommandException if hash algorithms satisfying the boto config file
228 cannot be returned.
229 """
230 check_hashes_config = config.get(
231 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL)
232 if check_hashes_config == CHECK_HASH_NEVER:
233 return {}
234
235 hash_algs = {}
236 if src_has_md5:
237 hash_algs['md5'] = md5
238 elif src_has_crc32c:
239 # If the cloud provider supplies a CRC, we'll compute a checksum to
240 # validate if we're using a native crcmod installation and MD5 isn't
241 # offered as an alternative.
242 if UsingCrcmodExtension(crcmod):
243 hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c')
244 elif not hash_algs:
245 if check_hashes_config == CHECK_HASH_IF_FAST_ELSE_FAIL:
246 raise CommandException(_SLOW_CRC_EXCEPTION_TEXT)
247 elif check_hashes_config == CHECK_HASH_IF_FAST_ELSE_SKIP:
248 logger.warn(_NO_HASH_CHECK_WARNING)
249 elif check_hashes_config == CHECK_HASH_ALWAYS:
250 logger.warn(_SLOW_CRCMOD_DOWNLOAD_WARNING)
251 hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c')
252 else:
253 raise CommandException(
254 'Your boto config \'check_hashes\' option is misconfigured.')
255
256 return hash_algs
257
258
259 class HashingFileUploadWrapper(object):
260 """Wraps an input stream in a hash digester and exposes a stream interface.
261
262 This class provides integrity checking during file uploads via the
263 following properties:
264
265 Calls to read will appropriately update digesters with all bytes read.
266 Calls to seek (assuming it is supported by the wrapped stream) using
267 os.SEEK_SET will catch up / reset the digesters to the specified
268 position. If seek is called with a different os.SEEK mode, the caller
269 must return to the original position using os.SEEK_SET before further
270 reads.
271 Calls to seek are fast if the desired position is equal to the position at
272 the beginning of the last read call (we only need to re-hash bytes
273 from that point on).
274 """
275
276 def __init__(self, stream, digesters, hash_algs, src_url, logger):
277 """Initializes the wrapper.
278
279 Args:
280 stream: Input stream.
281 digesters: dict of {string: hash digester} containing digesters, where
282 string is the name of the hash algorithm.
283 hash_algs: dict of {string: hash algorithm} for resetting and
284 recalculating digesters. String is the name of the hash algorithm.
285 src_url: Source FileUrl that is being copied.
286 logger: For outputting log messages.
287 """
288 if not digesters:
289 raise CommandException('HashingFileUploadWrapper used with no digesters.')
290 elif not hash_algs:
291 raise CommandException('HashingFileUploadWrapper used with no hash_algs.')
292
293 self._orig_fp = stream
294 self._digesters = digesters
295 self._src_url = src_url
296 self._logger = logger
297 self._seek_away = None
298
299 self._digesters_previous = {}
300 for alg in self._digesters:
301 self._digesters_previous[alg] = self._digesters[alg].copy()
302 self._digesters_previous_mark = 0
303 self._digesters_current_mark = 0
304 self._hash_algs = hash_algs
305
306 def read(self, size=-1): # pylint: disable=invalid-name
307 """"Reads from the wrapped file pointer and calculates hash digests.
308
309 Args:
310 size: The amount of bytes to read. If ommited or negative, the entire
311 contents of the file will be read, hashed, and returned.
312
313 Returns:
314 Bytes from the wrapped stream.
315
316 Raises:
317 CommandException if the position of the wrapped stream is unknown.
318 """
319 if self._seek_away is not None:
320 raise CommandException('Read called on hashing file pointer in an '
321 'unknown position; cannot correctly compute '
322 'digest.')
323
324 data = self._orig_fp.read(size)
325 self._digesters_previous_mark = self._digesters_current_mark
326 for alg in self._digesters:
327 self._digesters_previous[alg] = self._digesters[alg].copy()
328 self._digesters[alg].update(data)
329 self._digesters_current_mark += len(data)
330 return data
331
332 def tell(self): # pylint: disable=invalid-name
333 """Returns the current stream position."""
334 return self._orig_fp.tell()
335
336 def seekable(self): # pylint: disable=invalid-name
337 """Returns true if the stream is seekable."""
338 return self._orig_fp.seekable()
339
340 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name
341 """Seeks in the wrapped file pointer and catches up hash digests.
342
343 Args:
344 offset: The offset to seek to.
345 whence: os.SEEK_CUR, or SEEK_END, SEEK_SET.
346
347 Returns:
348 Return value from the wrapped stream's seek call.
349 """
350 if whence != os.SEEK_SET:
351 # We do not catch up hashes for non-absolute seeks, and rely on the
352 # caller to seek to an absolute position before reading.
353 self._seek_away = self._orig_fp.tell()
354
355 else:
356 # Hashes will be correct and it's safe to call read().
357 self._seek_away = None
358 if offset < self._digesters_previous_mark:
359 # This is earlier than our earliest saved digest, so we need to
360 # reset the digesters and scan from the beginning.
361 for alg in self._digesters:
362 self._digesters[alg] = self._hash_algs[alg]()
363 self._digesters_current_mark = 0
364 self._orig_fp.seek(0)
365 self._CatchUp(offset)
366
367 elif offset == self._digesters_previous_mark:
368 # Just load the saved digests.
369 self._digesters_current_mark = self._digesters_previous_mark
370 for alg in self._digesters:
371 self._digesters[alg] = self._digesters_previous[alg]
372
373 elif offset < self._digesters_current_mark:
374 # Reset the position to our previous digest and scan forward.
375 self._digesters_current_mark = self._digesters_previous_mark
376 for alg in self._digesters:
377 self._digesters[alg] = self._digesters_previous[alg]
378 self._orig_fp.seek(self._digesters_previous_mark)
379 self._CatchUp(offset - self._digesters_previous_mark)
380
381 else:
382 # Scan forward from our current digest and position.
383 self._orig_fp.seek(self._digesters_current_mark)
384 self._CatchUp(offset - self._digesters_current_mark)
385
386 return self._orig_fp.seek(offset, whence)
387
388 def _CatchUp(self, bytes_to_read):
389 """Catches up hashes, but does not return data and uses little memory.
390
391 Before calling this function, digesters_current_mark should be updated
392 to the current location of the original stream and the self._digesters
393 should be current to that point (but no further).
394
395 Args:
396 bytes_to_read: Number of bytes to catch up from the original stream.
397 """
398 if self._orig_fp.tell() != self._digesters_current_mark:
399 raise CommandException(
400 'Invalid mark when catching up hashes. Stream position %s, hash '
401 'position %s' % (self._orig_fp.tell(), self._digesters_current_mark))
402
403 for alg in self._digesters:
404 if bytes_to_read >= MIN_SIZE_COMPUTE_LOGGING:
405 self._logger.info('Catching up %s for %s...', alg,
406 self._src_url.url_string)
407 self._digesters_previous[alg] = self._digesters[alg].copy()
408
409 self._digesters_previous_mark = self._digesters_current_mark
410 bytes_remaining = bytes_to_read
411 bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE)
412 while bytes_this_round:
413 data = self._orig_fp.read(bytes_this_round)
414 bytes_remaining -= bytes_this_round
415 for alg in self._digesters:
416 self._digesters[alg].update(data)
417 bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE)
418 self._digesters_current_mark += bytes_to_read
OLDNEW
« no previous file with comments | « third_party/gsutil/gslib/gcs_json_media.py ('k') | third_party/gsutil/gslib/help_provider.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698