Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(61)

Side by Side Diff: py/utils/gs_utils.py

Issue 1088853004: Add retries to gs upload (Closed) Base URL: https://skia.googlesource.com/common.git@master
Patch Set: Don't stop a thread when a single upload fails Created 5 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/python 1 #!/usr/bin/python
2 2
3 # pylint: disable=C0301 3 # pylint: disable=C0301
4 """ 4 """
5 Copyright 2014 Google Inc. 5 Copyright 2014 Google Inc.
6 6
7 Use of this source code is governed by a BSD-style license that can be 7 Use of this source code is governed by a BSD-style license that can be
8 found in the LICENSE file. 8 found in the LICENSE file.
9 9
10 Utilities for accessing Google Cloud Storage, using the boto library (wrapper 10 Utilities for accessing Google Cloud Storage, using the boto library (wrapper
11 for the XML API). 11 for the XML API).
12 12
13 API/library references: 13 API/library references:
14 - https://developers.google.com/storage/docs/reference-guide 14 - https://developers.google.com/storage/docs/reference-guide
15 - http://googlecloudstorage.blogspot.com/2012/09/google-cloud-storage-tutorial-u sing-boto.html 15 - http://googlecloudstorage.blogspot.com/2012/09/google-cloud-storage-tutorial-u sing-boto.html
16 """ 16 """
17 # pylint: enable=C0301 17 # pylint: enable=C0301
18 18
19 # System-level imports 19 # System-level imports
20 import errno 20 import errno
21 import hashlib 21 import hashlib
22 import math
22 import os 23 import os
23 import posixpath 24 import posixpath
24 import Queue 25 import Queue
25 import re 26 import re
26 import sys 27 import sys
27 import threading 28 import threading
29 import time
28 30
29 # Imports from third-party code 31 # Imports from third-party code
30 TRUNK_DIRECTORY = os.path.abspath(os.path.join( 32 TRUNK_DIRECTORY = os.path.abspath(os.path.join(
31 os.path.dirname(__file__), os.pardir, os.pardir)) 33 os.path.dirname(__file__), os.pardir, os.pardir))
32 for import_subdir in ['boto']: 34 for import_subdir in ['boto']:
33 import_dirpath = os.path.join( 35 import_dirpath = os.path.join(
34 TRUNK_DIRECTORY, 'third_party', 'externals', import_subdir) 36 TRUNK_DIRECTORY, 'third_party', 'externals', import_subdir)
35 if import_dirpath not in sys.path: 37 if import_dirpath not in sys.path:
36 # We need to insert at the beginning of the path, to make sure that our 38 # We need to insert at the beginning of the path, to make sure that our
37 # imported versions are favored over others that might be in the path. 39 # imported versions are favored over others that might be in the path.
(...skipping 341 matching lines...) Expand 10 before | Expand all | Expand 10 after
379 if num_files_to_upload == 0: 381 if num_files_to_upload == 0:
380 return 382 return
381 if num_threads > num_files_to_upload: 383 if num_threads > num_files_to_upload:
382 num_threads = num_files_to_upload 384 num_threads = num_files_to_upload
383 385
384 # Create a work queue with all files that need to be uploaded. 386 # Create a work queue with all files that need to be uploaded.
385 q = Queue.Queue(maxsize=num_files_to_upload) 387 q = Queue.Queue(maxsize=num_files_to_upload)
386 for rel_path in source_fileset: 388 for rel_path in source_fileset:
387 q.put(rel_path) 389 q.put(rel_path)
388 390
391 err = {}
392
389 # Spin up worker threads to read from the task queue. 393 # Spin up worker threads to read from the task queue.
390 def worker(): 394 def worker():
391 while True: 395 while True:
392 try: 396 try:
393 rel_path = q.get(block=False) 397 rel_path = q.get(block=False)
394 except Queue.Empty: 398 except Queue.Empty:
395 return # no more tasks in the queue, so exit 399 return # no more tasks in the queue, so exit
396 print (' Uploading file %d/%d: %s' % ( 400 print (' Uploading file %d/%d: %s' % (
397 num_files_to_upload - q.qsize(), num_files_to_upload, rel_path)) 401 num_files_to_upload - q.qsize(), num_files_to_upload, rel_path))
398 self.upload_file( 402
399 source_path=os.path.join(source_dir, rel_path), 403 retries = 5
400 dest_bucket=b, 404 for retry in range(retries):
401 dest_path=posixpath.join(dest_dir, rel_path), 405 try:
402 upload_if=self.UploadIf.ALWAYS, 406 self.upload_file(
403 **kwargs) 407 source_path=os.path.join(source_dir, rel_path),
404 q.task_done() 408 dest_bucket=b,
409 dest_path=posixpath.join(dest_dir, rel_path),
410 upload_if=self.UploadIf.ALWAYS,
411 **kwargs)
412 q.task_done()
413 break
414 except Exception as error:
415 if retry < retries - 1:
416 print ' Retrying upload, attempt #%d' % retry + 1
417 time.sleep(math.pow(2, range))
mtklein 2015/04/14 17:17:14 time.sleep(2 ** retry) ?
borenet 2015/04/14 17:20:19 Done.
418 else:
419 err[rel_path] = error
420
405 for _ in range(num_threads): 421 for _ in range(num_threads):
406 t = threading.Thread(target=worker) 422 t = threading.Thread(target=worker)
407 t.daemon = True 423 t.daemon = True
408 t.start() 424 t.start()
409 425
410 # Block until all files have been uploaded and all workers have exited. 426 # Block until all files have been uploaded and all workers have exited.
411 q.join() 427 q.join()
412 428
429 if err:
430 errMsg = 'Failed to upload the following: \n\n'
431 for rel_path, e in err.iteritems():
432 errMsg += '%s: %s\n' % (rel_path, e)
433 raise Exception(errMsg)
434
413 def download_file(self, source_bucket, source_path, dest_path, 435 def download_file(self, source_bucket, source_path, dest_path,
414 create_subdirs_if_needed=False, source_generation=None): 436 create_subdirs_if_needed=False, source_generation=None):
415 """Downloads a single file from Google Cloud Storage to local disk. 437 """Downloads a single file from Google Cloud Storage to local disk.
416 438
417 Args: 439 Args:
418 source_bucket: GS bucket to download the file from 440 source_bucket: GS bucket to download the file from
419 source_path: full path (Posix-style) within that bucket 441 source_path: full path (Posix-style) within that bucket
420 dest_path: full path (local-OS-style) on local disk to copy the file to 442 dest_path: full path (local-OS-style) on local disk to copy the file to
421 create_subdirs_if_needed: boolean; whether to create subdirectories as 443 create_subdirs_if_needed: boolean; whether to create subdirectories as
422 needed to create dest_path 444 needed to create dest_path
(...skipping 281 matching lines...) Expand 10 before | Expand all | Expand 10 after
704 726
705 def _get_local_md5(path): 727 def _get_local_md5(path):
706 """Returns the MD5 hash of a file on local disk.""" 728 """Returns the MD5 hash of a file on local disk."""
707 hasher = hashlib.md5() 729 hasher = hashlib.md5()
708 with open(path, 'rb') as f: 730 with open(path, 'rb') as f:
709 while True: 731 while True:
710 data = f.read(64*1024) 732 data = f.read(64*1024)
711 if not data: 733 if not data:
712 return hasher.hexdigest() 734 return hasher.hexdigest()
713 hasher.update(data) 735 hasher.update(data)
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698