OLD | NEW |
1 #!/usr/bin/python | 1 #!/usr/bin/python |
2 | 2 |
3 # pylint: disable=C0301 | 3 # pylint: disable=C0301 |
4 """ | 4 """ |
5 Copyright 2014 Google Inc. | 5 Copyright 2014 Google Inc. |
6 | 6 |
7 Use of this source code is governed by a BSD-style license that can be | 7 Use of this source code is governed by a BSD-style license that can be |
8 found in the LICENSE file. | 8 found in the LICENSE file. |
9 | 9 |
10 Utilities for accessing Google Cloud Storage, using the boto library (wrapper | 10 Utilities for accessing Google Cloud Storage, using the boto library (wrapper |
11 for the XML API). | 11 for the XML API). |
12 | 12 |
13 API/library references: | 13 API/library references: |
14 - https://developers.google.com/storage/docs/reference-guide | 14 - https://developers.google.com/storage/docs/reference-guide |
15 - http://googlecloudstorage.blogspot.com/2012/09/google-cloud-storage-tutorial-u
sing-boto.html | 15 - http://googlecloudstorage.blogspot.com/2012/09/google-cloud-storage-tutorial-u
sing-boto.html |
16 """ | 16 """ |
17 # pylint: enable=C0301 | 17 # pylint: enable=C0301 |
18 | 18 |
19 # System-level imports | 19 # System-level imports |
20 import errno | 20 import errno |
21 import hashlib | 21 import hashlib |
| 22 import math |
22 import os | 23 import os |
23 import posixpath | 24 import posixpath |
24 import Queue | 25 import Queue |
25 import re | 26 import re |
26 import sys | 27 import sys |
27 import threading | 28 import threading |
| 29 import time |
28 | 30 |
29 # Imports from third-party code | 31 # Imports from third-party code |
30 TRUNK_DIRECTORY = os.path.abspath(os.path.join( | 32 TRUNK_DIRECTORY = os.path.abspath(os.path.join( |
31 os.path.dirname(__file__), os.pardir, os.pardir)) | 33 os.path.dirname(__file__), os.pardir, os.pardir)) |
32 for import_subdir in ['boto']: | 34 for import_subdir in ['boto']: |
33 import_dirpath = os.path.join( | 35 import_dirpath = os.path.join( |
34 TRUNK_DIRECTORY, 'third_party', 'externals', import_subdir) | 36 TRUNK_DIRECTORY, 'third_party', 'externals', import_subdir) |
35 if import_dirpath not in sys.path: | 37 if import_dirpath not in sys.path: |
36 # We need to insert at the beginning of the path, to make sure that our | 38 # We need to insert at the beginning of the path, to make sure that our |
37 # imported versions are favored over others that might be in the path. | 39 # imported versions are favored over others that might be in the path. |
(...skipping 341 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
379 if num_files_to_upload == 0: | 381 if num_files_to_upload == 0: |
380 return | 382 return |
381 if num_threads > num_files_to_upload: | 383 if num_threads > num_files_to_upload: |
382 num_threads = num_files_to_upload | 384 num_threads = num_files_to_upload |
383 | 385 |
384 # Create a work queue with all files that need to be uploaded. | 386 # Create a work queue with all files that need to be uploaded. |
385 q = Queue.Queue(maxsize=num_files_to_upload) | 387 q = Queue.Queue(maxsize=num_files_to_upload) |
386 for rel_path in source_fileset: | 388 for rel_path in source_fileset: |
387 q.put(rel_path) | 389 q.put(rel_path) |
388 | 390 |
| 391 err = {} |
| 392 |
389 # Spin up worker threads to read from the task queue. | 393 # Spin up worker threads to read from the task queue. |
390 def worker(): | 394 def worker(): |
391 while True: | 395 while True: |
392 try: | 396 try: |
393 rel_path = q.get(block=False) | 397 rel_path = q.get(block=False) |
394 except Queue.Empty: | 398 except Queue.Empty: |
395 return # no more tasks in the queue, so exit | 399 return # no more tasks in the queue, so exit |
396 print (' Uploading file %d/%d: %s' % ( | 400 print (' Uploading file %d/%d: %s' % ( |
397 num_files_to_upload - q.qsize(), num_files_to_upload, rel_path)) | 401 num_files_to_upload - q.qsize(), num_files_to_upload, rel_path)) |
398 self.upload_file( | 402 |
399 source_path=os.path.join(source_dir, rel_path), | 403 retries = 5 |
400 dest_bucket=b, | 404 for retry in range(retries): |
401 dest_path=posixpath.join(dest_dir, rel_path), | 405 try: |
402 upload_if=self.UploadIf.ALWAYS, | 406 self.upload_file( |
403 **kwargs) | 407 source_path=os.path.join(source_dir, rel_path), |
404 q.task_done() | 408 dest_bucket=b, |
| 409 dest_path=posixpath.join(dest_dir, rel_path), |
| 410 upload_if=self.UploadIf.ALWAYS, |
| 411 **kwargs) |
| 412 q.task_done() |
| 413 break |
| 414 except Exception as error: |
| 415 if retry < retries - 1: |
| 416 print ' Retrying upload, attempt #%d' % retry + 1 |
| 417 time.sleep(2 ** retry) |
| 418 else: |
| 419 err[rel_path] = error |
| 420 |
405 for _ in range(num_threads): | 421 for _ in range(num_threads): |
406 t = threading.Thread(target=worker) | 422 t = threading.Thread(target=worker) |
407 t.daemon = True | 423 t.daemon = True |
408 t.start() | 424 t.start() |
409 | 425 |
410 # Block until all files have been uploaded and all workers have exited. | 426 # Block until all files have been uploaded and all workers have exited. |
411 q.join() | 427 q.join() |
412 | 428 |
| 429 if err: |
| 430 errMsg = 'Failed to upload the following: \n\n' |
| 431 for rel_path, e in err.iteritems(): |
| 432 errMsg += '%s: %s\n' % (rel_path, e) |
| 433 raise Exception(errMsg) |
| 434 |
413 def download_file(self, source_bucket, source_path, dest_path, | 435 def download_file(self, source_bucket, source_path, dest_path, |
414 create_subdirs_if_needed=False, source_generation=None): | 436 create_subdirs_if_needed=False, source_generation=None): |
415 """Downloads a single file from Google Cloud Storage to local disk. | 437 """Downloads a single file from Google Cloud Storage to local disk. |
416 | 438 |
417 Args: | 439 Args: |
418 source_bucket: GS bucket to download the file from | 440 source_bucket: GS bucket to download the file from |
419 source_path: full path (Posix-style) within that bucket | 441 source_path: full path (Posix-style) within that bucket |
420 dest_path: full path (local-OS-style) on local disk to copy the file to | 442 dest_path: full path (local-OS-style) on local disk to copy the file to |
421 create_subdirs_if_needed: boolean; whether to create subdirectories as | 443 create_subdirs_if_needed: boolean; whether to create subdirectories as |
422 needed to create dest_path | 444 needed to create dest_path |
(...skipping 281 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
704 | 726 |
705 def _get_local_md5(path): | 727 def _get_local_md5(path): |
706 """Returns the MD5 hash of a file on local disk.""" | 728 """Returns the MD5 hash of a file on local disk.""" |
707 hasher = hashlib.md5() | 729 hasher = hashlib.md5() |
708 with open(path, 'rb') as f: | 730 with open(path, 'rb') as f: |
709 while True: | 731 while True: |
710 data = f.read(64*1024) | 732 data = f.read(64*1024) |
711 if not data: | 733 if not data: |
712 return hasher.hexdigest() | 734 return hasher.hexdigest() |
713 hasher.update(data) | 735 hasher.update(data) |
OLD | NEW |