OLD | NEW |
| (Empty) |
1 # -*- coding: utf-8 -*- | |
2 # Copyright 2014 Google Inc. All Rights Reserved. | |
3 # | |
4 # Licensed under the Apache License, Version 2.0 (the "License"); | |
5 # you may not use this file except in compliance with the License. | |
6 # You may obtain a copy of the License at | |
7 # | |
8 # http://www.apache.org/licenses/LICENSE-2.0 | |
9 # | |
10 # Unless required by applicable law or agreed to in writing, software | |
11 # distributed under the License is distributed on an "AS IS" BASIS, | |
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 # See the License for the specific language governing permissions and | |
14 # limitations under the License. | |
15 """Helper functions for hashing functionality.""" | |
16 | |
17 import base64 | |
18 import binascii | |
19 from hashlib import md5 | |
20 import os | |
21 | |
22 from boto import config | |
23 import crcmod | |
24 | |
25 from gslib.exception import CommandException | |
26 from gslib.util import DEFAULT_FILE_BUFFER_SIZE | |
27 from gslib.util import MIN_SIZE_COMPUTE_LOGGING | |
28 from gslib.util import TRANSFER_BUFFER_SIZE | |
29 from gslib.util import UsingCrcmodExtension | |
30 | |
31 | |
32 SLOW_CRCMOD_WARNING = """ | |
33 WARNING: You have requested checksumming but your crcmod installation isn't | |
34 using the module's C extension, so checksumming will run very slowly. For help | |
35 installing the extension, please see: | |
36 $ gsutil help crcmod | |
37 """ | |
38 | |
39 | |
40 _SLOW_CRCMOD_DOWNLOAD_WARNING = """ | |
41 WARNING: Downloading this composite object requires integrity checking with | |
42 CRC32c, but your crcmod installation isn't using the module's C extension, | |
43 so the hash computation will likely throttle download performance. For help | |
44 installing the extension, please see: | |
45 $ gsutil help crcmod | |
46 To disable slow integrity checking, see the "check_hashes" option in your | |
47 boto config file. | |
48 """ | |
49 | |
50 _SLOW_CRC_EXCEPTION_TEXT = """ | |
51 Downloading this composite object requires integrity checking with CRC32c, | |
52 but your crcmod installation isn't using the module's C extension, so the | |
53 hash computation will likely throttle download performance. For help | |
54 installing the extension, please see: | |
55 | |
56 $ gsutil help crcmod | |
57 | |
58 To download regardless of crcmod performance or to skip slow integrity | |
59 checks, see the "check_hashes" option in your boto config file. | |
60 | |
61 NOTE: It is strongly recommended that you not disable integrity checks. Doing so | |
62 could allow data corruption to go undetected during uploading/downloading.""" | |
63 | |
64 | |
65 _NO_HASH_CHECK_WARNING = """ | |
66 WARNING: This download will not be validated since your crcmod installation | |
67 doesn't use the module's C extension, so the hash computation would likely | |
68 throttle download performance. For help in installing the extension, please | |
69 see: | |
70 $ gsutil help crcmod | |
71 To force integrity checking, see the "check_hashes" option in your boto config | |
72 file. | |
73 """ | |
74 | |
75 | |
76 # Configuration values for hashing. | |
77 CHECK_HASH_IF_FAST_ELSE_FAIL = 'if_fast_else_fail' | |
78 CHECK_HASH_IF_FAST_ELSE_SKIP = 'if_fast_else_skip' | |
79 CHECK_HASH_ALWAYS = 'always' | |
80 CHECK_HASH_NEVER = 'never' | |
81 | |
82 | |
83 def _CalculateHashFromContents(fp, hash_alg): | |
84 """Calculates a base64 digest of the contents of a seekable stream. | |
85 | |
86 This function resets the file pointer to position 0. | |
87 | |
88 Args: | |
89 fp: An already-open file object. | |
90 hash_alg: Instance of hashing class initialized to start state. | |
91 | |
92 Returns: | |
93 Hash of the stream in hex string format. | |
94 """ | |
95 hash_dict = {'placeholder': hash_alg} | |
96 fp.seek(0) | |
97 CalculateHashesFromContents(fp, hash_dict) | |
98 fp.seek(0) | |
99 return hash_dict['placeholder'].hexdigest() | |
100 | |
101 | |
102 def CalculateHashesFromContents(fp, hash_dict, callback_processor=None): | |
103 """Calculates hashes of the contents of a file. | |
104 | |
105 Args: | |
106 fp: An already-open file object (stream will be consumed). | |
107 hash_dict: Dict of (string alg_name: initialized hashing class) | |
108 Hashing class will be populated with digests upon return. | |
109 callback_processor: Optional callback processing class that implements | |
110 Progress(integer amount of bytes processed). | |
111 """ | |
112 while True: | |
113 data = fp.read(DEFAULT_FILE_BUFFER_SIZE) | |
114 if not data: | |
115 break | |
116 for hash_alg in hash_dict.itervalues(): | |
117 hash_alg.update(data) | |
118 if callback_processor: | |
119 callback_processor.Progress(len(data)) | |
120 | |
121 | |
122 def CalculateB64EncodedCrc32cFromContents(fp): | |
123 """Calculates a base64 CRC32c checksum of the contents of a seekable stream. | |
124 | |
125 This function sets the stream position 0 before and after calculation. | |
126 | |
127 Args: | |
128 fp: An already-open file object. | |
129 | |
130 Returns: | |
131 CRC32c checksum of the file in base64 format. | |
132 """ | |
133 return _CalculateB64EncodedHashFromContents( | |
134 fp, crcmod.predefined.Crc('crc-32c')) | |
135 | |
136 | |
137 def CalculateB64EncodedMd5FromContents(fp): | |
138 """Calculates a base64 MD5 digest of the contents of a seekable stream. | |
139 | |
140 This function sets the stream position 0 before and after calculation. | |
141 | |
142 Args: | |
143 fp: An already-open file object. | |
144 | |
145 Returns: | |
146 MD5 digest of the file in base64 format. | |
147 """ | |
148 return _CalculateB64EncodedHashFromContents(fp, md5()) | |
149 | |
150 | |
151 def CalculateMd5FromContents(fp): | |
152 """Calculates a base64 MD5 digest of the contents of a seekable stream. | |
153 | |
154 This function sets the stream position 0 before and after calculation. | |
155 | |
156 Args: | |
157 fp: An already-open file object. | |
158 | |
159 Returns: | |
160 MD5 digest of the file in hex format. | |
161 """ | |
162 return _CalculateHashFromContents(fp, md5()) | |
163 | |
164 | |
165 def Base64EncodeHash(digest_value): | |
166 """Returns the base64-encoded version of the input hex digest value.""" | |
167 return base64.encodestring(binascii.unhexlify(digest_value)).rstrip('\n') | |
168 | |
169 | |
170 def Base64ToHexHash(base64_hash): | |
171 """Returns the hex digest value of the input base64-encoded hash. | |
172 | |
173 Args: | |
174 base64_hash: Base64-encoded hash, which may contain newlines and single or | |
175 double quotes. | |
176 | |
177 Returns: | |
178 Hex digest of the input argument. | |
179 """ | |
180 return binascii.hexlify(base64.decodestring(base64_hash.strip('\n"\''))) | |
181 | |
182 | |
183 def _CalculateB64EncodedHashFromContents(fp, hash_alg): | |
184 """Calculates a base64 digest of the contents of a seekable stream. | |
185 | |
186 This function sets the stream position 0 before and after calculation. | |
187 | |
188 Args: | |
189 fp: An already-open file object. | |
190 hash_alg: Instance of hashing class initialized to start state. | |
191 | |
192 Returns: | |
193 Hash of the stream in base64 format. | |
194 """ | |
195 return Base64EncodeHash(_CalculateHashFromContents(fp, hash_alg)) | |
196 | |
197 | |
198 def GetUploadHashAlgs(): | |
199 """Returns a dict of hash algorithms for validating an uploaded object. | |
200 | |
201 This is for use only with single object uploads, not compose operations | |
202 such as those used by parallel composite uploads (though it can be used to | |
203 validate the individual components). | |
204 | |
205 Returns: | |
206 dict of (algorithm_name: hash_algorithm) | |
207 """ | |
208 check_hashes_config = config.get( | |
209 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL) | |
210 if check_hashes_config == 'never': | |
211 return {} | |
212 return {'md5': md5} | |
213 | |
214 | |
215 def GetDownloadHashAlgs(logger, src_has_md5=False, src_has_crc32c=False): | |
216 """Returns a dict of hash algorithms for validating an object. | |
217 | |
218 Args: | |
219 logger: logging.Logger for outputting log messages. | |
220 src_has_md5: If True, source object has an md5 hash. | |
221 src_has_crc32c: If True, source object has a crc32c hash. | |
222 | |
223 Returns: | |
224 Dict of (string, hash algorithm). | |
225 | |
226 Raises: | |
227 CommandException if hash algorithms satisfying the boto config file | |
228 cannot be returned. | |
229 """ | |
230 check_hashes_config = config.get( | |
231 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL) | |
232 if check_hashes_config == CHECK_HASH_NEVER: | |
233 return {} | |
234 | |
235 hash_algs = {} | |
236 if src_has_md5: | |
237 hash_algs['md5'] = md5 | |
238 elif src_has_crc32c: | |
239 # If the cloud provider supplies a CRC, we'll compute a checksum to | |
240 # validate if we're using a native crcmod installation and MD5 isn't | |
241 # offered as an alternative. | |
242 if UsingCrcmodExtension(crcmod): | |
243 hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c') | |
244 elif not hash_algs: | |
245 if check_hashes_config == CHECK_HASH_IF_FAST_ELSE_FAIL: | |
246 raise CommandException(_SLOW_CRC_EXCEPTION_TEXT) | |
247 elif check_hashes_config == CHECK_HASH_IF_FAST_ELSE_SKIP: | |
248 logger.warn(_NO_HASH_CHECK_WARNING) | |
249 elif check_hashes_config == CHECK_HASH_ALWAYS: | |
250 logger.warn(_SLOW_CRCMOD_DOWNLOAD_WARNING) | |
251 hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c') | |
252 else: | |
253 raise CommandException( | |
254 'Your boto config \'check_hashes\' option is misconfigured.') | |
255 | |
256 return hash_algs | |
257 | |
258 | |
259 class HashingFileUploadWrapper(object): | |
260 """Wraps an input stream in a hash digester and exposes a stream interface. | |
261 | |
262 This class provides integrity checking during file uploads via the | |
263 following properties: | |
264 | |
265 Calls to read will appropriately update digesters with all bytes read. | |
266 Calls to seek (assuming it is supported by the wrapped stream) using | |
267 os.SEEK_SET will catch up / reset the digesters to the specified | |
268 position. If seek is called with a different os.SEEK mode, the caller | |
269 must return to the original position using os.SEEK_SET before further | |
270 reads. | |
271 Calls to seek are fast if the desired position is equal to the position at | |
272 the beginning of the last read call (we only need to re-hash bytes | |
273 from that point on). | |
274 """ | |
275 | |
276 def __init__(self, stream, digesters, hash_algs, src_url, logger): | |
277 """Initializes the wrapper. | |
278 | |
279 Args: | |
280 stream: Input stream. | |
281 digesters: dict of {string: hash digester} containing digesters, where | |
282 string is the name of the hash algorithm. | |
283 hash_algs: dict of {string: hash algorithm} for resetting and | |
284 recalculating digesters. String is the name of the hash algorithm. | |
285 src_url: Source FileUrl that is being copied. | |
286 logger: For outputting log messages. | |
287 """ | |
288 if not digesters: | |
289 raise CommandException('HashingFileUploadWrapper used with no digesters.') | |
290 elif not hash_algs: | |
291 raise CommandException('HashingFileUploadWrapper used with no hash_algs.') | |
292 | |
293 self._orig_fp = stream | |
294 self._digesters = digesters | |
295 self._src_url = src_url | |
296 self._logger = logger | |
297 self._seek_away = None | |
298 | |
299 self._digesters_previous = {} | |
300 for alg in self._digesters: | |
301 self._digesters_previous[alg] = self._digesters[alg].copy() | |
302 self._digesters_previous_mark = 0 | |
303 self._digesters_current_mark = 0 | |
304 self._hash_algs = hash_algs | |
305 | |
306 def read(self, size=-1): # pylint: disable=invalid-name | |
307 """"Reads from the wrapped file pointer and calculates hash digests. | |
308 | |
309 Args: | |
310 size: The amount of bytes to read. If ommited or negative, the entire | |
311 contents of the file will be read, hashed, and returned. | |
312 | |
313 Returns: | |
314 Bytes from the wrapped stream. | |
315 | |
316 Raises: | |
317 CommandException if the position of the wrapped stream is unknown. | |
318 """ | |
319 if self._seek_away is not None: | |
320 raise CommandException('Read called on hashing file pointer in an ' | |
321 'unknown position; cannot correctly compute ' | |
322 'digest.') | |
323 | |
324 data = self._orig_fp.read(size) | |
325 self._digesters_previous_mark = self._digesters_current_mark | |
326 for alg in self._digesters: | |
327 self._digesters_previous[alg] = self._digesters[alg].copy() | |
328 self._digesters[alg].update(data) | |
329 self._digesters_current_mark += len(data) | |
330 return data | |
331 | |
332 def tell(self): # pylint: disable=invalid-name | |
333 """Returns the current stream position.""" | |
334 return self._orig_fp.tell() | |
335 | |
336 def seekable(self): # pylint: disable=invalid-name | |
337 """Returns true if the stream is seekable.""" | |
338 return self._orig_fp.seekable() | |
339 | |
340 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name | |
341 """Seeks in the wrapped file pointer and catches up hash digests. | |
342 | |
343 Args: | |
344 offset: The offset to seek to. | |
345 whence: os.SEEK_CUR, or SEEK_END, SEEK_SET. | |
346 | |
347 Returns: | |
348 Return value from the wrapped stream's seek call. | |
349 """ | |
350 if whence != os.SEEK_SET: | |
351 # We do not catch up hashes for non-absolute seeks, and rely on the | |
352 # caller to seek to an absolute position before reading. | |
353 self._seek_away = self._orig_fp.tell() | |
354 | |
355 else: | |
356 # Hashes will be correct and it's safe to call read(). | |
357 self._seek_away = None | |
358 if offset < self._digesters_previous_mark: | |
359 # This is earlier than our earliest saved digest, so we need to | |
360 # reset the digesters and scan from the beginning. | |
361 for alg in self._digesters: | |
362 self._digesters[alg] = self._hash_algs[alg]() | |
363 self._digesters_current_mark = 0 | |
364 self._orig_fp.seek(0) | |
365 self._CatchUp(offset) | |
366 | |
367 elif offset == self._digesters_previous_mark: | |
368 # Just load the saved digests. | |
369 self._digesters_current_mark = self._digesters_previous_mark | |
370 for alg in self._digesters: | |
371 self._digesters[alg] = self._digesters_previous[alg] | |
372 | |
373 elif offset < self._digesters_current_mark: | |
374 # Reset the position to our previous digest and scan forward. | |
375 self._digesters_current_mark = self._digesters_previous_mark | |
376 for alg in self._digesters: | |
377 self._digesters[alg] = self._digesters_previous[alg] | |
378 self._orig_fp.seek(self._digesters_previous_mark) | |
379 self._CatchUp(offset - self._digesters_previous_mark) | |
380 | |
381 else: | |
382 # Scan forward from our current digest and position. | |
383 self._orig_fp.seek(self._digesters_current_mark) | |
384 self._CatchUp(offset - self._digesters_current_mark) | |
385 | |
386 return self._orig_fp.seek(offset, whence) | |
387 | |
388 def _CatchUp(self, bytes_to_read): | |
389 """Catches up hashes, but does not return data and uses little memory. | |
390 | |
391 Before calling this function, digesters_current_mark should be updated | |
392 to the current location of the original stream and the self._digesters | |
393 should be current to that point (but no further). | |
394 | |
395 Args: | |
396 bytes_to_read: Number of bytes to catch up from the original stream. | |
397 """ | |
398 if self._orig_fp.tell() != self._digesters_current_mark: | |
399 raise CommandException( | |
400 'Invalid mark when catching up hashes. Stream position %s, hash ' | |
401 'position %s' % (self._orig_fp.tell(), self._digesters_current_mark)) | |
402 | |
403 for alg in self._digesters: | |
404 if bytes_to_read >= MIN_SIZE_COMPUTE_LOGGING: | |
405 self._logger.info('Catching up %s for %s...', alg, | |
406 self._src_url.url_string) | |
407 self._digesters_previous[alg] = self._digesters[alg].copy() | |
408 | |
409 self._digesters_previous_mark = self._digesters_current_mark | |
410 bytes_remaining = bytes_to_read | |
411 bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE) | |
412 while bytes_this_round: | |
413 data = self._orig_fp.read(bytes_this_round) | |
414 bytes_remaining -= bytes_this_round | |
415 for alg in self._digesters: | |
416 self._digesters[alg].update(data) | |
417 bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE) | |
418 self._digesters_current_mark += bytes_to_read | |
OLD | NEW |