Index: py/utils/gs_utils.py |
diff --git a/py/utils/gs_utils.py b/py/utils/gs_utils.py |
index f53b4d1427bf9078463547fdf7d5751ef437a24d..7824c79b6c66d266f9ba68abe4ed3d6c672bd54e 100644 |
--- a/py/utils/gs_utils.py |
+++ b/py/utils/gs_utils.py |
@@ -19,10 +19,13 @@ API/library references: |
# System-level imports |
import errno |
import hashlib |
+import logging |
import os |
import posixpath |
+import Queue |
import re |
import sys |
+import threading |
# Imports from third-party code |
TRUNK_DIRECTORY = os.path.abspath(os.path.join( |
@@ -43,6 +46,13 @@ from boto.s3.bucketlistresultset import BucketListResultSet |
from boto.s3.connection import SubdomainCallingFormat |
from boto.s3.prefix import Prefix |
+# How many files to upload at once, by default. |
+# TODO(epoger): Is there a way to compute this intelligently? To some extent |
+# it is a function of how many cores are on the machine, and how many other |
+# processes it is running; but it's probably more a function of how much time |
+# each core sits idle waiting for network I/O to complete. |
+DEFAULT_UPLOAD_THREADS = 10 |
+ |
class AnonymousGSConnection(GSConnection): |
"""GSConnection class that allows anonymous connections. |
@@ -116,21 +126,24 @@ class GSUtils(object): |
IF_MODIFIED = 3 # if there is an existing file with the same name and |
# contents, leave it alone |
- def __init__(self, boto_file_path=None): |
+ def __init__(self, boto_file_path=None, logger=None): |
"""Constructor. |
Params: |
boto_file_path: full path (local-OS-style) on local disk where .boto |
credentials file can be found. If None, then the GSUtils object |
created will be able to access only public files in Google Storage. |
+ logger: a logging.Logger instance to use for logging output; if None, |
+ one will be created with default characteristics |
Raises an exception if no file is found at boto_file_path, or if the file |
found there is malformed. |
""" |
+ self.logger = logger or logging.getLogger(__name__) |
self._gs_access_key_id = None |
self._gs_secret_access_key = None |
if boto_file_path: |
- print 'Reading boto file from %s' % boto_file_path |
+ self.logger.info('Reading boto file from %s' % boto_file_path) |
boto_dict = _config_file_as_dict(filepath=boto_file_path) |
self._gs_access_key_id = boto_dict['gs_access_key_id'] |
self._gs_secret_access_key = boto_dict['gs_secret_access_key'] |
@@ -215,8 +228,8 @@ class GSUtils(object): |
if upload_if == self.UploadIf.IF_NEW: |
old_key = b.get_key(key_name=dest_path) |
if old_key: |
- print 'Skipping upload of existing file gs://%s/%s' % ( |
- b.name, dest_path) |
+ self.logger.info('Skipping upload of existing file gs://%s/%s' % ( |
+ b.name, dest_path)) |
return |
elif upload_if == self.UploadIf.IF_MODIFIED: |
old_key = b.get_key(key_name=dest_path) |
@@ -224,8 +237,9 @@ class GSUtils(object): |
if not local_md5: |
local_md5 = _get_local_md5(path=source_path) |
if ('"%s"' % local_md5) == old_key.etag: |
- print 'Skipping upload of unmodified file gs://%s/%s : %s' % ( |
- b.name, dest_path, local_md5) |
+ self.logger.info( |
+ 'Skipping upload of unmodified file gs://%s/%s : %s' % ( |
+ b.name, dest_path, local_md5)) |
return |
elif upload_if != self.UploadIf.ALWAYS: |
raise Exception('unknown value of upload_if: %s' % upload_if) |
@@ -280,6 +294,7 @@ class GSUtils(object): |
id_type=id_type, id_value=id_value, permission=permission) |
def upload_dir_contents(self, source_dir, dest_bucket, dest_dir, |
+ num_threads=DEFAULT_UPLOAD_THREADS, |
upload_if=UploadIf.ALWAYS, **kwargs): |
"""Recursively upload contents of a local directory to Google Storage. |
@@ -289,6 +304,7 @@ class GSUtils(object): |
dest_bucket: GS bucket to copy the files into |
dest_dir: full path (Posix-style) within that bucket; write the files into |
this directory. If None, write into the root directory of the bucket. |
+ num_threads: how many files to upload at once |
upload_if: one of the UploadIf values, describing in which cases we should |
upload the file |
kwargs: any additional keyword arguments "inherited" from upload_file() |
@@ -310,6 +326,7 @@ class GSUtils(object): |
relative_dirpath = dirpath[prefix_length:] |
for filename in filenames: |
source_fileset.add(os.path.join(relative_dirpath, filename)) |
+ num_files_total = len(source_fileset) |
# If we are only uploading files conditionally, remove any unnecessary |
# files from source_fileset. |
@@ -343,13 +360,42 @@ class GSUtils(object): |
raise Exception('unknown value of upload_if: %s' % upload_if) |
# Upload any files still in source_fileset. |
- for rel_path in sorted(source_fileset): |
- self.upload_file( |
- source_path=os.path.join(source_dir, rel_path), |
- dest_bucket=b, |
- dest_path=posixpath.join(dest_dir, rel_path), |
- upload_if=self.UploadIf.ALWAYS, |
- **kwargs) |
+ num_files_to_upload = len(source_fileset) |
+ self.logger.info('Uploading %d files, skipping %d ...' % ( |
+ num_files_to_upload, num_files_total - num_files_to_upload)) |
+ if num_files_to_upload == 0: |
+ return |
+ if num_threads > num_files_to_upload: |
+ num_threads = num_files_to_upload |
+ |
+ # Create a work queue with all files that need to be uploaded. |
+ q = Queue.Queue(maxsize=num_files_to_upload) |
+ for rel_path in source_fileset: |
+ q.put(rel_path) |
+ |
+ # Spin up worker threads to read from the task queue. |
+ def worker(): |
+ while True: |
+ try: |
+ rel_path = q.get(block=False) |
+ except Queue.Empty: |
+ return # no more tasks in the queue, so exit |
+ self.logger.info(' Uploading file %d/%d: %s' % ( |
+ num_files_to_upload - q.qsize(), num_files_to_upload, rel_path)) |
+ self.upload_file( |
+ source_path=os.path.join(source_dir, rel_path), |
+ dest_bucket=b, |
+ dest_path=posixpath.join(dest_dir, rel_path), |
+ upload_if=self.UploadIf.ALWAYS, |
+ **kwargs) |
+ q.task_done() |
+ for _ in range(num_threads): |
+ t = threading.Thread(target=worker) |
+ t.daemon = True |
+ t.start() |
+ |
+ # Block until all files have been uploaded and all workers have exited. |
+ q.join() |
def download_file(self, source_bucket, source_path, dest_path, |
create_subdirs_if_needed=False): |