Chromium Code Reviews| Index: py/utils/gs_utils.py |
| diff --git a/py/utils/gs_utils.py b/py/utils/gs_utils.py |
| index f53b4d1427bf9078463547fdf7d5751ef437a24d..8ab12c513271fe29ca6cb4030d52f178e964bdd2 100644 |
| --- a/py/utils/gs_utils.py |
| +++ b/py/utils/gs_utils.py |
| @@ -19,10 +19,13 @@ API/library references: |
| # System-level imports |
| import errno |
| import hashlib |
| +import logging |
| import os |
| import posixpath |
| import re |
| import sys |
| +from Queue import Queue |
| +from threading import Thread |
| # Imports from third-party code |
| TRUNK_DIRECTORY = os.path.abspath(os.path.join( |
| @@ -116,21 +119,24 @@ class GSUtils(object): |
| IF_MODIFIED = 3 # if there is an existing file with the same name and |
| # contents, leave it alone |
| - def __init__(self, boto_file_path=None): |
| + def __init__(self, boto_file_path=None, logger=None): |
| """Constructor. |
| Params: |
| boto_file_path: full path (local-OS-style) on local disk where .boto |
| credentials file can be found. If None, then the GSUtils object |
| created will be able to access only public files in Google Storage. |
| + logger: a logging.Logger instance to use for logging output; if None, |
| + one will be created with default characteristics |
| Raises an exception if no file is found at boto_file_path, or if the file |
| found there is malformed. |
| """ |
| + self.logger = logger or logging.getLogger(__name__) |
| self._gs_access_key_id = None |
| self._gs_secret_access_key = None |
| if boto_file_path: |
| - print 'Reading boto file from %s' % boto_file_path |
| + self.logger.info('Reading boto file from %s' % boto_file_path) |
| boto_dict = _config_file_as_dict(filepath=boto_file_path) |
| self._gs_access_key_id = boto_dict['gs_access_key_id'] |
| self._gs_secret_access_key = boto_dict['gs_secret_access_key'] |
| @@ -215,8 +221,8 @@ class GSUtils(object): |
| if upload_if == self.UploadIf.IF_NEW: |
| old_key = b.get_key(key_name=dest_path) |
| if old_key: |
| - print 'Skipping upload of existing file gs://%s/%s' % ( |
| - b.name, dest_path) |
| + self.logger.info('Skipping upload of existing file gs://%s/%s' % ( |
| + b.name, dest_path)) |
| return |
| elif upload_if == self.UploadIf.IF_MODIFIED: |
| old_key = b.get_key(key_name=dest_path) |
| @@ -224,8 +230,9 @@ class GSUtils(object): |
| if not local_md5: |
| local_md5 = _get_local_md5(path=source_path) |
| if ('"%s"' % local_md5) == old_key.etag: |
| - print 'Skipping upload of unmodified file gs://%s/%s : %s' % ( |
| - b.name, dest_path, local_md5) |
| + self.logger.info( |
| + 'Skipping upload of unmodified file gs://%s/%s : %s' % ( |
| + b.name, dest_path, local_md5)) |
| return |
| elif upload_if != self.UploadIf.ALWAYS: |
| raise Exception('unknown value of upload_if: %s' % upload_if) |
| @@ -280,7 +287,7 @@ class GSUtils(object): |
| id_type=id_type, id_value=id_value, permission=permission) |
| def upload_dir_contents(self, source_dir, dest_bucket, dest_dir, |
| - upload_if=UploadIf.ALWAYS, **kwargs): |
| + num_threads=10, upload_if=UploadIf.ALWAYS, **kwargs): |
| """Recursively upload contents of a local directory to Google Storage. |
| params: |
| @@ -289,6 +296,7 @@ class GSUtils(object): |
| dest_bucket: GS bucket to copy the files into |
| dest_dir: full path (Posix-style) within that bucket; write the files into |
| this directory. If None, write into the root directory of the bucket. |
| + num_threads: how many files to upload at once |
| upload_if: one of the UploadIf values, describing in which cases we should |
| upload the file |
| kwargs: any additional keyword arguments "inherited" from upload_file() |
| @@ -310,6 +318,7 @@ class GSUtils(object): |
| relative_dirpath = dirpath[prefix_length:] |
| for filename in filenames: |
| source_fileset.add(os.path.join(relative_dirpath, filename)) |
| + num_files_total = len(source_fileset) |
| # If we are only uploading files conditionally, remove any unnecessary |
| # files from source_fileset. |
| @@ -342,14 +351,49 @@ class GSUtils(object): |
| else: |
| raise Exception('unknown value of upload_if: %s' % upload_if) |
| + # Set up a counter of files that have been uploaded. |
| + # Python's Global Interpreter Lock should make this thread-safe; see |
| + # http://www.gossamer-threads.com/lists/python/dev/273403 |
| + num_files_to_upload = len(source_fileset) |
| + atomic_incrementer = iter(xrange(1, num_files_to_upload+1)).next |
| + |
| # Upload any files still in source_fileset. |
| - for rel_path in sorted(source_fileset): |
| - self.upload_file( |
| - source_path=os.path.join(source_dir, rel_path), |
| - dest_bucket=b, |
| - dest_path=posixpath.join(dest_dir, rel_path), |
| - upload_if=self.UploadIf.ALWAYS, |
| - **kwargs) |
| + self.logger.info('Uploading %d files, skipping %d ...' % ( |
| + num_files_to_upload, num_files_total - num_files_to_upload)) |
| + if num_files_to_upload == 0: |
| + return |
| + if num_threads > num_files_to_upload: |
| + num_threads = num_files_to_upload |
|
borenet
2014/07/28 13:32:11
Maybe line 358 should be moved here so that it doe
epoger
2014/07/28 14:14:13
Done. I don't think there is significant performa
|
| + q = Queue(maxsize=2*num_threads) |
| + end_of_queue = object() |
| + def worker(): |
| + while True: |
| + rel_path = q.get() |
| + if rel_path is end_of_queue: |
| + q.task_done() |
| + return |
|
borenet
2014/07/28 13:32:11
Why not use q.get(False) or q.get_nowait() and try
epoger
2014/07/28 14:14:14
The problem with checking for an empty queue is th
borenet
2014/07/28 14:20:07
I was thinking of #2, since you're dealing with th
|
| + self.logger.info(' Uploading file %d/%d: %s' % ( |
| + atomic_incrementer(), num_files_to_upload, rel_path)) |
| + self.upload_file( |
| + source_path=os.path.join(source_dir, rel_path), |
| + dest_bucket=b, |
| + dest_path=posixpath.join(dest_dir, rel_path), |
| + upload_if=self.UploadIf.ALWAYS, |
| + **kwargs) |
| + q.task_done() |
| + # Spin up the threads. |
| + for _ in range(num_threads): |
| + t = Thread(target=worker) |
| + t.daemon = True |
| + t.start() |
| + # Start loading files into the work queue. |
| + for rel_path in source_fileset: |
| + q.put(rel_path) |
| + # Tell all workers to go home. |
| + for _ in range(num_threads): |
| + q.put(end_of_queue) |
| + # Block until all files have been uploaded and all workers have gone home. |
| + q.join() |
| def download_file(self, source_bucket, source_path, dest_path, |
| create_subdirs_if_needed=False): |