| Index: py/utils/gs_utils.py
|
| diff --git a/py/utils/gs_utils.py b/py/utils/gs_utils.py
|
| index f53b4d1427bf9078463547fdf7d5751ef437a24d..7824c79b6c66d266f9ba68abe4ed3d6c672bd54e 100644
|
| --- a/py/utils/gs_utils.py
|
| +++ b/py/utils/gs_utils.py
|
| @@ -19,10 +19,13 @@ API/library references:
|
| # System-level imports
|
| import errno
|
| import hashlib
|
| +import logging
|
| import os
|
| import posixpath
|
| +import Queue
|
| import re
|
| import sys
|
| +import threading
|
|
|
| # Imports from third-party code
|
| TRUNK_DIRECTORY = os.path.abspath(os.path.join(
|
| @@ -43,6 +46,13 @@ from boto.s3.bucketlistresultset import BucketListResultSet
|
| from boto.s3.connection import SubdomainCallingFormat
|
| from boto.s3.prefix import Prefix
|
|
|
| +# How many files to upload at once, by default.
|
| +# TODO(epoger): Is there a way to compute this intelligently? To some extent
|
| +# it is a function of how many cores are on the machine, and how many other
|
| +# processes it is running; but it's probably more a function of how much time
|
| +# each core sits idle waiting for network I/O to complete.
|
| +DEFAULT_UPLOAD_THREADS = 10
|
| +
|
|
|
| class AnonymousGSConnection(GSConnection):
|
| """GSConnection class that allows anonymous connections.
|
| @@ -116,21 +126,24 @@ class GSUtils(object):
|
| IF_MODIFIED = 3 # if there is an existing file with the same name and
|
| # contents, leave it alone
|
|
|
| - def __init__(self, boto_file_path=None):
|
| + def __init__(self, boto_file_path=None, logger=None):
|
| """Constructor.
|
|
|
| Params:
|
| boto_file_path: full path (local-OS-style) on local disk where .boto
|
| credentials file can be found. If None, then the GSUtils object
|
| created will be able to access only public files in Google Storage.
|
| + logger: a logging.Logger instance to use for logging output; if None,
|
| + one will be created with default characteristics
|
|
|
| Raises an exception if no file is found at boto_file_path, or if the file
|
| found there is malformed.
|
| """
|
| + self.logger = logger or logging.getLogger(__name__)
|
| self._gs_access_key_id = None
|
| self._gs_secret_access_key = None
|
| if boto_file_path:
|
| - print 'Reading boto file from %s' % boto_file_path
|
| + self.logger.info('Reading boto file from %s' % boto_file_path)
|
| boto_dict = _config_file_as_dict(filepath=boto_file_path)
|
| self._gs_access_key_id = boto_dict['gs_access_key_id']
|
| self._gs_secret_access_key = boto_dict['gs_secret_access_key']
|
| @@ -215,8 +228,8 @@ class GSUtils(object):
|
| if upload_if == self.UploadIf.IF_NEW:
|
| old_key = b.get_key(key_name=dest_path)
|
| if old_key:
|
| - print 'Skipping upload of existing file gs://%s/%s' % (
|
| - b.name, dest_path)
|
| + self.logger.info('Skipping upload of existing file gs://%s/%s' % (
|
| + b.name, dest_path))
|
| return
|
| elif upload_if == self.UploadIf.IF_MODIFIED:
|
| old_key = b.get_key(key_name=dest_path)
|
| @@ -224,8 +237,9 @@ class GSUtils(object):
|
| if not local_md5:
|
| local_md5 = _get_local_md5(path=source_path)
|
| if ('"%s"' % local_md5) == old_key.etag:
|
| - print 'Skipping upload of unmodified file gs://%s/%s : %s' % (
|
| - b.name, dest_path, local_md5)
|
| + self.logger.info(
|
| + 'Skipping upload of unmodified file gs://%s/%s : %s' % (
|
| + b.name, dest_path, local_md5))
|
| return
|
| elif upload_if != self.UploadIf.ALWAYS:
|
| raise Exception('unknown value of upload_if: %s' % upload_if)
|
| @@ -280,6 +294,7 @@ class GSUtils(object):
|
| id_type=id_type, id_value=id_value, permission=permission)
|
|
|
| def upload_dir_contents(self, source_dir, dest_bucket, dest_dir,
|
| + num_threads=DEFAULT_UPLOAD_THREADS,
|
| upload_if=UploadIf.ALWAYS, **kwargs):
|
| """Recursively upload contents of a local directory to Google Storage.
|
|
|
| @@ -289,6 +304,7 @@ class GSUtils(object):
|
| dest_bucket: GS bucket to copy the files into
|
| dest_dir: full path (Posix-style) within that bucket; write the files into
|
| this directory. If None, write into the root directory of the bucket.
|
| + num_threads: how many files to upload at once
|
| upload_if: one of the UploadIf values, describing in which cases we should
|
| upload the file
|
| kwargs: any additional keyword arguments "inherited" from upload_file()
|
| @@ -310,6 +326,7 @@ class GSUtils(object):
|
| relative_dirpath = dirpath[prefix_length:]
|
| for filename in filenames:
|
| source_fileset.add(os.path.join(relative_dirpath, filename))
|
| + num_files_total = len(source_fileset)
|
|
|
| # If we are only uploading files conditionally, remove any unnecessary
|
| # files from source_fileset.
|
| @@ -343,13 +360,42 @@ class GSUtils(object):
|
| raise Exception('unknown value of upload_if: %s' % upload_if)
|
|
|
| # Upload any files still in source_fileset.
|
| - for rel_path in sorted(source_fileset):
|
| - self.upload_file(
|
| - source_path=os.path.join(source_dir, rel_path),
|
| - dest_bucket=b,
|
| - dest_path=posixpath.join(dest_dir, rel_path),
|
| - upload_if=self.UploadIf.ALWAYS,
|
| - **kwargs)
|
| + num_files_to_upload = len(source_fileset)
|
| + self.logger.info('Uploading %d files, skipping %d ...' % (
|
| + num_files_to_upload, num_files_total - num_files_to_upload))
|
| + if num_files_to_upload == 0:
|
| + return
|
| + if num_threads > num_files_to_upload:
|
| + num_threads = num_files_to_upload
|
| +
|
| + # Create a work queue with all files that need to be uploaded.
|
| + q = Queue.Queue(maxsize=num_files_to_upload)
|
| + for rel_path in source_fileset:
|
| + q.put(rel_path)
|
| +
|
| + # Spin up worker threads to read from the task queue.
|
| + def worker():
|
| + while True:
|
| + try:
|
| + rel_path = q.get(block=False)
|
| + except Queue.Empty:
|
| + return # no more tasks in the queue, so exit
|
| + self.logger.info(' Uploading file %d/%d: %s' % (
|
| + num_files_to_upload - q.qsize(), num_files_to_upload, rel_path))
|
| + self.upload_file(
|
| + source_path=os.path.join(source_dir, rel_path),
|
| + dest_bucket=b,
|
| + dest_path=posixpath.join(dest_dir, rel_path),
|
| + upload_if=self.UploadIf.ALWAYS,
|
| + **kwargs)
|
| + q.task_done()
|
| + for _ in range(num_threads):
|
| + t = threading.Thread(target=worker)
|
| + t.daemon = True
|
| + t.start()
|
| +
|
| + # Block until all files have been uploaded and all workers have exited.
|
| + q.join()
|
|
|
| def download_file(self, source_bucket, source_path, dest_path,
|
| create_subdirs_if_needed=False):
|
|
|