Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(484)

Unified Diff: py/utils/gs_utils.py

Issue 424553002: upload_dir_contents(): upload multiple files in parallel (Closed) Base URL: https://skia.googlesource.com/common.git@master
Patch Set: replace prints with logging Created 6 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | py/utils/gs_utils_manualtest.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: py/utils/gs_utils.py
diff --git a/py/utils/gs_utils.py b/py/utils/gs_utils.py
index f53b4d1427bf9078463547fdf7d5751ef437a24d..8ab12c513271fe29ca6cb4030d52f178e964bdd2 100644
--- a/py/utils/gs_utils.py
+++ b/py/utils/gs_utils.py
@@ -19,10 +19,13 @@ API/library references:
# System-level imports
import errno
import hashlib
+import logging
import os
import posixpath
import re
import sys
+from Queue import Queue
+from threading import Thread
# Imports from third-party code
TRUNK_DIRECTORY = os.path.abspath(os.path.join(
@@ -116,21 +119,24 @@ class GSUtils(object):
IF_MODIFIED = 3 # if there is an existing file with the same name and
# contents, leave it alone
- def __init__(self, boto_file_path=None):
+ def __init__(self, boto_file_path=None, logger=None):
"""Constructor.
Params:
boto_file_path: full path (local-OS-style) on local disk where .boto
credentials file can be found. If None, then the GSUtils object
created will be able to access only public files in Google Storage.
+ logger: a logging.Logger instance to use for logging output; if None,
+ one will be created with default characteristics
Raises an exception if no file is found at boto_file_path, or if the file
found there is malformed.
"""
+ self.logger = logger or logging.getLogger(__name__)
self._gs_access_key_id = None
self._gs_secret_access_key = None
if boto_file_path:
- print 'Reading boto file from %s' % boto_file_path
+ self.logger.info('Reading boto file from %s' % boto_file_path)
boto_dict = _config_file_as_dict(filepath=boto_file_path)
self._gs_access_key_id = boto_dict['gs_access_key_id']
self._gs_secret_access_key = boto_dict['gs_secret_access_key']
@@ -215,8 +221,8 @@ class GSUtils(object):
if upload_if == self.UploadIf.IF_NEW:
old_key = b.get_key(key_name=dest_path)
if old_key:
- print 'Skipping upload of existing file gs://%s/%s' % (
- b.name, dest_path)
+ self.logger.info('Skipping upload of existing file gs://%s/%s' % (
+ b.name, dest_path))
return
elif upload_if == self.UploadIf.IF_MODIFIED:
old_key = b.get_key(key_name=dest_path)
@@ -224,8 +230,9 @@ class GSUtils(object):
if not local_md5:
local_md5 = _get_local_md5(path=source_path)
if ('"%s"' % local_md5) == old_key.etag:
- print 'Skipping upload of unmodified file gs://%s/%s : %s' % (
- b.name, dest_path, local_md5)
+ self.logger.info(
+ 'Skipping upload of unmodified file gs://%s/%s : %s' % (
+ b.name, dest_path, local_md5))
return
elif upload_if != self.UploadIf.ALWAYS:
raise Exception('unknown value of upload_if: %s' % upload_if)
@@ -280,7 +287,7 @@ class GSUtils(object):
id_type=id_type, id_value=id_value, permission=permission)
def upload_dir_contents(self, source_dir, dest_bucket, dest_dir,
- upload_if=UploadIf.ALWAYS, **kwargs):
+ num_threads=10, upload_if=UploadIf.ALWAYS, **kwargs):
"""Recursively upload contents of a local directory to Google Storage.
params:
@@ -289,6 +296,7 @@ class GSUtils(object):
dest_bucket: GS bucket to copy the files into
dest_dir: full path (Posix-style) within that bucket; write the files into
this directory. If None, write into the root directory of the bucket.
+ num_threads: how many files to upload at once
upload_if: one of the UploadIf values, describing in which cases we should
upload the file
kwargs: any additional keyword arguments "inherited" from upload_file()
@@ -310,6 +318,7 @@ class GSUtils(object):
relative_dirpath = dirpath[prefix_length:]
for filename in filenames:
source_fileset.add(os.path.join(relative_dirpath, filename))
+ num_files_total = len(source_fileset)
# If we are only uploading files conditionally, remove any unnecessary
# files from source_fileset.
@@ -342,14 +351,49 @@ class GSUtils(object):
else:
raise Exception('unknown value of upload_if: %s' % upload_if)
+ # Set up a counter of files that have been uploaded.
+ # Python's Global Interpreter Lock should make this thread-safe; see
+ # http://www.gossamer-threads.com/lists/python/dev/273403
+ num_files_to_upload = len(source_fileset)
+ atomic_incrementer = iter(xrange(1, num_files_to_upload+1)).next
+
# Upload any files still in source_fileset.
- for rel_path in sorted(source_fileset):
- self.upload_file(
- source_path=os.path.join(source_dir, rel_path),
- dest_bucket=b,
- dest_path=posixpath.join(dest_dir, rel_path),
- upload_if=self.UploadIf.ALWAYS,
- **kwargs)
+ self.logger.info('Uploading %d files, skipping %d ...' % (
+ num_files_to_upload, num_files_total - num_files_to_upload))
+ if num_files_to_upload == 0:
+ return
+ if num_threads > num_files_to_upload:
+ num_threads = num_files_to_upload
borenet 2014/07/28 13:32:11 Maybe line 358 should be moved here so that it doe
epoger 2014/07/28 14:14:13 Done. I don't think there is significant performa
+ q = Queue(maxsize=2*num_threads)
+ end_of_queue = object()
+ def worker():
+ while True:
+ rel_path = q.get()
+ if rel_path is end_of_queue:
+ q.task_done()
+ return
borenet 2014/07/28 13:32:11 Why not use q.get(False) or q.get_nowait() and try
epoger 2014/07/28 14:14:14 The problem with checking for an empty queue is th
borenet 2014/07/28 14:20:07 I was thinking of #2, since you're dealing with th
+ self.logger.info(' Uploading file %d/%d: %s' % (
+ atomic_incrementer(), num_files_to_upload, rel_path))
+ self.upload_file(
+ source_path=os.path.join(source_dir, rel_path),
+ dest_bucket=b,
+ dest_path=posixpath.join(dest_dir, rel_path),
+ upload_if=self.UploadIf.ALWAYS,
+ **kwargs)
+ q.task_done()
+ # Spin up the threads.
+ for _ in range(num_threads):
+ t = Thread(target=worker)
+ t.daemon = True
+ t.start()
+ # Start loading files into the work queue.
+ for rel_path in source_fileset:
+ q.put(rel_path)
+ # Tell all workers to go home.
+ for _ in range(num_threads):
+ q.put(end_of_queue)
+ # Block until all files have been uploaded and all workers have gone home.
+ q.join()
def download_file(self, source_bucket, source_path, dest_path,
create_subdirs_if_needed=False):
« no previous file with comments | « no previous file | py/utils/gs_utils_manualtest.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698