| Index: third_party/gsutil/gslib/boto_translation.py
 | 
| diff --git a/third_party/gsutil/gslib/boto_translation.py b/third_party/gsutil/gslib/boto_translation.py
 | 
| index 92013904c47ffa66a82776e3d2f6da29ebcff54b..edb69ddcb2e1ddab89a9982a4102c4528b058344 100644
 | 
| --- a/third_party/gsutil/gslib/boto_translation.py
 | 
| +++ b/third_party/gsutil/gslib/boto_translation.py
 | 
| @@ -30,6 +30,7 @@ import re
 | 
|  import socket
 | 
|  import tempfile
 | 
|  import textwrap
 | 
| +import threading
 | 
|  import time
 | 
|  import xml
 | 
|  from xml.dom.minidom import parseString as XmlParseString
 | 
| @@ -74,6 +75,7 @@ from gslib.translation_helper import AclTranslation
 | 
|  from gslib.translation_helper import AddS3MarkerAclToObjectMetadata
 | 
|  from gslib.translation_helper import CorsTranslation
 | 
|  from gslib.translation_helper import CreateBucketNotFoundException
 | 
| +from gslib.translation_helper import CreateNotFoundExceptionForObjectWrite
 | 
|  from gslib.translation_helper import CreateObjectNotFoundException
 | 
|  from gslib.translation_helper import DEFAULT_CONTENT_TYPE
 | 
|  from gslib.translation_helper import EncodeStringAsLong
 | 
| @@ -84,10 +86,8 @@ from gslib.translation_helper import REMOVE_CORS_CONFIG
 | 
|  from gslib.translation_helper import S3MarkerAclFromObjectMetadata
 | 
|  from gslib.util import ConfigureNoOpAuthIfNeeded
 | 
|  from gslib.util import DEFAULT_FILE_BUFFER_SIZE
 | 
| -from gslib.util import GetFileSize
 | 
|  from gslib.util import GetMaxRetryDelay
 | 
|  from gslib.util import GetNumRetries
 | 
| -from gslib.util import MultiprocessingIsAvailable
 | 
|  from gslib.util import S3_DELETE_MARKER_GUID
 | 
|  from gslib.util import TWO_MIB
 | 
|  from gslib.util import UnaryDictToXml
 | 
| @@ -101,8 +101,11 @@ TRANSLATABLE_BOTO_EXCEPTIONS = (boto.exception.BotoServerError,
 | 
|                                  boto.exception.StorageCreateError,
 | 
|                                  boto.exception.StorageResponseError)
 | 
|  
 | 
| -# If multiprocessing is available, this will be overridden to a (thread-safe)
 | 
| -# multiprocessing.Value in a call to InitializeMultiprocessingVariables.
 | 
| +# pylint: disable=global-at-module-level
 | 
| +global boto_auth_initialized, boto_auth_initialized_lock
 | 
| +# If multiprocessing is available, these will be overridden to process-safe
 | 
| +# variables in InitializeMultiprocessingVariables.
 | 
| +boto_auth_initialized_lock = threading.Lock()
 | 
|  boto_auth_initialized = False
 | 
|  
 | 
|  NON_EXISTENT_OBJECT_REGEX = re.compile(r'.*non-\s*existent\s*object',
 | 
| @@ -111,16 +114,36 @@ NON_EXISTENT_OBJECT_REGEX = re.compile(r'.*non-\s*existent\s*object',
 | 
|  MD5_REGEX = re.compile(r'^"*[a-fA-F0-9]{32}"*$')
 | 
|  
 | 
|  
 | 
| -def InitializeMultiprocessingVariables():
 | 
| +def InitializeMultiprocessingVariables():  # pylint: disable=invalid-name
 | 
|    """Perform necessary initialization for multiprocessing.
 | 
|  
 | 
|      See gslib.command.InitializeMultiprocessingVariables for an explanation
 | 
|      of why this is necessary.
 | 
|    """
 | 
| -  global boto_auth_initialized  # pylint: disable=global-variable-undefined
 | 
| +  # pylint: disable=global-variable-undefined
 | 
| +  global boto_auth_initialized, boto_auth_initialized_lock
 | 
| +  boto_auth_initialized_lock = gslib.util.CreateLock()
 | 
|    boto_auth_initialized = multiprocessing.Value('i', 0)
 | 
|  
 | 
|  
 | 
| +class DownloadProxyCallbackHandler(object):
 | 
| +  """Intermediary callback to keep track of the number of bytes downloaded."""
 | 
| +
 | 
| +  def __init__(self, start_byte, callback):
 | 
| +    self._start_byte = start_byte
 | 
| +    self._callback = callback
 | 
| +
 | 
| +  def call(self, bytes_downloaded, total_size):
 | 
| +    """Saves necessary data and then calls the given Cloud API callback.
 | 
| +
 | 
| +    Args:
 | 
| +      bytes_downloaded: Number of bytes processed so far.
 | 
| +      total_size: Total size of the ongoing operation.
 | 
| +    """
 | 
| +    if self._callback:
 | 
| +      self._callback(self._start_byte + bytes_downloaded, total_size)
 | 
| +
 | 
| +
 | 
|  class BotoTranslation(CloudApi):
 | 
|    """Boto-based XML translation implementation of gsutil Cloud API.
 | 
|  
 | 
| @@ -130,7 +153,7 @@ class BotoTranslation(CloudApi):
 | 
|    """
 | 
|  
 | 
|    def __init__(self, bucket_storage_uri_class, logger, provider=None,
 | 
| -               credentials=None, debug=0):
 | 
| +               credentials=None, debug=0, trace_token=None):
 | 
|      """Performs necessary setup for interacting with the cloud storage provider.
 | 
|  
 | 
|      Args:
 | 
| @@ -142,17 +165,19 @@ class BotoTranslation(CloudApi):
 | 
|                  the provider argument and use this one instead.
 | 
|        credentials: Unused.
 | 
|        debug: Debug level for the API implementation (0..3).
 | 
| +      trace_token: Unused in this subclass.
 | 
|      """
 | 
|      super(BotoTranslation, self).__init__(bucket_storage_uri_class, logger,
 | 
|                                            provider=provider, debug=debug)
 | 
|      _ = credentials
 | 
| -    global boto_auth_initialized  # pylint: disable=global-variable-undefined
 | 
| -    if MultiprocessingIsAvailable()[0] and not boto_auth_initialized.value:
 | 
| -      ConfigureNoOpAuthIfNeeded()
 | 
| -      boto_auth_initialized.value = 1
 | 
| -    elif not boto_auth_initialized:
 | 
| +    # pylint: disable=global-variable-undefined, global-variable-not-assigned
 | 
| +    global boto_auth_initialized, boto_auth_initialized_lock
 | 
| +    with boto_auth_initialized_lock:
 | 
|        ConfigureNoOpAuthIfNeeded()
 | 
| -      boto_auth_initialized = True
 | 
| +      if isinstance(boto_auth_initialized, bool):
 | 
| +        boto_auth_initialized = True
 | 
| +      else:
 | 
| +        boto_auth_initialized.value = 1
 | 
|      self.api_version = boto.config.get_value(
 | 
|          'GSUtil', 'default_api_version', '1')
 | 
|  
 | 
| @@ -318,8 +343,10 @@ class BotoTranslation(CloudApi):
 | 
|      _ = provider
 | 
|      get_fields = self._ListToGetFields(list_fields=fields)
 | 
|      bucket_uri = self._StorageUriForBucket(bucket_name)
 | 
| -    prefix_list = []
 | 
|      headers = {}
 | 
| +    yield_prefixes = fields is None or 'prefixes' in fields
 | 
| +    yield_objects = fields is None or any(
 | 
| +        field.startswith('items/') for field in fields)
 | 
|      self._AddApiVersionToHeaders(headers)
 | 
|      try:
 | 
|        objects_iter = bucket_uri.list_bucket(prefix=prefix or '',
 | 
| @@ -331,11 +358,10 @@ class BotoTranslation(CloudApi):
 | 
|  
 | 
|      try:
 | 
|        for key in objects_iter:
 | 
| -        if isinstance(key, Prefix):
 | 
| -          prefix_list.append(key.name)
 | 
| +        if yield_prefixes and isinstance(key, Prefix):
 | 
|            yield CloudApi.CsObjectOrPrefix(key.name,
 | 
|                                            CloudApi.CsObjectOrPrefixType.PREFIX)
 | 
| -        else:
 | 
| +        elif yield_objects:
 | 
|            key_to_convert = key
 | 
|  
 | 
|            # Listed keys are populated with these fields during bucket listing.
 | 
| @@ -407,11 +433,11 @@ class BotoTranslation(CloudApi):
 | 
|      self._AddApiVersionToHeaders(headers)
 | 
|      if 'accept-encoding' not in headers:
 | 
|        headers['accept-encoding'] = 'gzip'
 | 
| -    if end_byte:
 | 
| +    if end_byte is not None:
 | 
|        headers['range'] = 'bytes=%s-%s' % (start_byte, end_byte)
 | 
|      elif start_byte > 0:
 | 
|        headers['range'] = 'bytes=%s-' % (start_byte)
 | 
| -    else:
 | 
| +    elif start_byte < 0:
 | 
|        headers['range'] = 'bytes=%s' % (start_byte)
 | 
|  
 | 
|      # Since in most cases we already made a call to get the object metadata,
 | 
| @@ -443,7 +469,8 @@ class BotoTranslation(CloudApi):
 | 
|      try:
 | 
|        if download_strategy is CloudApi.DownloadStrategy.RESUMABLE:
 | 
|          self._PerformResumableDownload(
 | 
| -            download_stream, key, headers=headers, callback=progress_callback,
 | 
| +            download_stream, start_byte, end_byte, key,
 | 
| +            headers=headers, callback=progress_callback,
 | 
|              num_callbacks=num_progress_callbacks, hash_algs=hash_algs)
 | 
|        elif download_strategy is CloudApi.DownloadStrategy.ONE_SHOT:
 | 
|          self._PerformSimpleDownload(
 | 
| @@ -505,13 +532,16 @@ class BotoTranslation(CloudApi):
 | 
|        key.get_contents_to_file(download_stream, cb=progress_callback,
 | 
|                                 num_cb=num_progress_callbacks, headers=headers)
 | 
|  
 | 
| -  def _PerformResumableDownload(self, fp, key, headers=None, callback=None,
 | 
| +  def _PerformResumableDownload(self, fp, start_byte, end_byte, key,
 | 
| +                                headers=None, callback=None,
 | 
|                                  num_callbacks=XML_PROGRESS_CALLBACKS,
 | 
|                                  hash_algs=None):
 | 
|      """Downloads bytes from key to fp, resuming as needed.
 | 
|  
 | 
|      Args:
 | 
| -      fp: File pointer into which data should be downloaded
 | 
| +      fp: File pointer into which data should be downloaded.
 | 
| +      start_byte: Start byte of the download.
 | 
| +      end_byte: End byte of the download.
 | 
|        key: Key object from which data is to be downloaded
 | 
|        headers: Headers to send when retrieving the file
 | 
|        callback: (optional) a callback function that will be called to report
 | 
| @@ -540,37 +570,22 @@ class BotoTranslation(CloudApi):
 | 
|  
 | 
|      num_retries = GetNumRetries()
 | 
|      progress_less_iterations = 0
 | 
| +    last_progress_byte = start_byte
 | 
|  
 | 
|      while True:  # Retry as long as we're making progress.
 | 
| -      had_file_bytes_before_attempt = GetFileSize(fp)
 | 
|        try:
 | 
| -        cur_file_size = GetFileSize(fp, position_to_eof=True)
 | 
| -
 | 
| -        def DownloadProxyCallback(total_bytes_downloaded, total_size):
 | 
| -          """Translates a boto callback into a gsutil Cloud API callback.
 | 
| -
 | 
| -          Callbacks are originally made by boto.s3.Key.get_file(); here we take
 | 
| -          into account that we're resuming a download.
 | 
| -
 | 
| -          Args:
 | 
| -            total_bytes_downloaded: Actual bytes downloaded so far, not
 | 
| -                                    including the point we resumed from.
 | 
| -            total_size: Total size of the download.
 | 
| -          """
 | 
| -          if callback:
 | 
| -            callback(cur_file_size + total_bytes_downloaded, total_size)
 | 
| -
 | 
| +        cb_handler = DownloadProxyCallbackHandler(start_byte, callback)
 | 
|          headers = headers.copy()
 | 
| -        headers['Range'] = 'bytes=%d-%d' % (cur_file_size, key.size - 1)
 | 
| -        cb = DownloadProxyCallback
 | 
| +        headers['Range'] = 'bytes=%d-%d' % (start_byte, end_byte)
 | 
|  
 | 
|          # Disable AWSAuthConnection-level retry behavior, since that would
 | 
|          # cause downloads to restart from scratch.
 | 
|          try:
 | 
| -          key.get_file(fp, headers, cb, num_callbacks, override_num_retries=0,
 | 
| -                       hash_algs=hash_algs)
 | 
| +          key.get_file(fp, headers, cb_handler.call, num_callbacks,
 | 
| +                       override_num_retries=0, hash_algs=hash_algs)
 | 
|          except TypeError:
 | 
| -          key.get_file(fp, headers, cb, num_callbacks, override_num_retries=0)
 | 
| +          key.get_file(fp, headers, cb_handler.call, num_callbacks,
 | 
| +                       override_num_retries=0)
 | 
|          fp.flush()
 | 
|          # Download succeeded.
 | 
|          return
 | 
| @@ -583,9 +598,10 @@ class BotoTranslation(CloudApi):
 | 
|            # so we need to close and reopen the key before resuming
 | 
|            # the download.
 | 
|            if self.provider == 's3':
 | 
| -            key.get_file(fp, headers, cb, num_callbacks, override_num_retries=0)
 | 
| +            key.get_file(fp, headers, cb_handler.call, num_callbacks,
 | 
| +                         override_num_retries=0)
 | 
|            else:  # self.provider == 'gs'
 | 
| -            key.get_file(fp, headers, cb, num_callbacks,
 | 
| +            key.get_file(fp, headers, cb_handler.call, num_callbacks,
 | 
|                           override_num_retries=0, hash_algs=hash_algs)
 | 
|        except BotoResumableDownloadException, e:
 | 
|          if (e.disposition ==
 | 
| @@ -597,7 +613,9 @@ class BotoTranslation(CloudApi):
 | 
|                               'retry', e.message)
 | 
|  
 | 
|        # At this point we had a re-tryable failure; see if made progress.
 | 
| -      if GetFileSize(fp) > had_file_bytes_before_attempt:
 | 
| +      start_byte = fp.tell()
 | 
| +      if start_byte > last_progress_byte:
 | 
| +        last_progress_byte = start_byte
 | 
|          progress_less_iterations = 0
 | 
|        else:
 | 
|          progress_less_iterations += 1
 | 
| @@ -819,8 +837,11 @@ class BotoTranslation(CloudApi):
 | 
|        return self._HandleSuccessfulUpload(dst_uri, object_metadata,
 | 
|                                            fields=fields)
 | 
|      except TRANSLATABLE_BOTO_EXCEPTIONS, e:
 | 
| +      not_found_exception = CreateNotFoundExceptionForObjectWrite(
 | 
| +          self.provider, object_metadata.bucket)
 | 
|        self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket,
 | 
| -                                       object_name=object_metadata.name)
 | 
| +                                       object_name=object_metadata.name,
 | 
| +                                       not_found_exception=not_found_exception)
 | 
|  
 | 
|    def UploadObjectStreaming(self, upload_stream, object_metadata,
 | 
|                              canned_acl=None, progress_callback=None,
 | 
| @@ -836,8 +857,11 @@ class BotoTranslation(CloudApi):
 | 
|        return self._HandleSuccessfulUpload(dst_uri, object_metadata,
 | 
|                                            fields=fields)
 | 
|      except TRANSLATABLE_BOTO_EXCEPTIONS, e:
 | 
| +      not_found_exception = CreateNotFoundExceptionForObjectWrite(
 | 
| +          self.provider, object_metadata.bucket)
 | 
|        self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket,
 | 
| -                                       object_name=object_metadata.name)
 | 
| +                                       object_name=object_metadata.name,
 | 
| +                                       not_found_exception=not_found_exception)
 | 
|  
 | 
|    def UploadObject(self, upload_stream, object_metadata, canned_acl=None,
 | 
|                     preconditions=None, size=None, progress_callback=None,
 | 
| @@ -860,8 +884,11 @@ class BotoTranslation(CloudApi):
 | 
|        return self._HandleSuccessfulUpload(dst_uri, object_metadata,
 | 
|                                            fields=fields)
 | 
|      except TRANSLATABLE_BOTO_EXCEPTIONS, e:
 | 
| +      not_found_exception = CreateNotFoundExceptionForObjectWrite(
 | 
| +          self.provider, object_metadata.bucket)
 | 
|        self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket,
 | 
| -                                       object_name=object_metadata.name)
 | 
| +                                       object_name=object_metadata.name,
 | 
| +                                       not_found_exception=not_found_exception)
 | 
|  
 | 
|    def DeleteObject(self, bucket_name, object_name, preconditions=None,
 | 
|                     generation=None, provider=None):
 | 
| @@ -920,8 +947,13 @@ class BotoTranslation(CloudApi):
 | 
|  
 | 
|        return self._BotoKeyToObject(new_key, fields=fields)
 | 
|      except TRANSLATABLE_BOTO_EXCEPTIONS, e:
 | 
| -      self._TranslateExceptionAndRaise(e, dst_obj_metadata.bucket,
 | 
| -                                       dst_obj_metadata.name)
 | 
| +      not_found_exception = CreateNotFoundExceptionForObjectWrite(
 | 
| +          self.provider, dst_obj_metadata.bucket, src_provider=self.provider,
 | 
| +          src_bucket_name=src_obj_metadata.bucket,
 | 
| +          src_object_name=src_obj_metadata.name, src_generation=src_generation)
 | 
| +      self._TranslateExceptionAndRaise(e, bucket_name=dst_obj_metadata.bucket,
 | 
| +                                       object_name=dst_obj_metadata.name,
 | 
| +                                       not_found_exception=not_found_exception)
 | 
|  
 | 
|    def ComposeObject(self, src_objs_metadata, dst_obj_metadata,
 | 
|                      preconditions=None, provider=None, fields=None):
 | 
| @@ -1382,7 +1414,7 @@ class BotoTranslation(CloudApi):
 | 
|          raise
 | 
|  
 | 
|    def _TranslateExceptionAndRaise(self, e, bucket_name=None, object_name=None,
 | 
| -                                  generation=None):
 | 
| +                                  generation=None, not_found_exception=None):
 | 
|      """Translates a Boto exception and raises the translated or original value.
 | 
|  
 | 
|      Args:
 | 
| @@ -1390,6 +1422,7 @@ class BotoTranslation(CloudApi):
 | 
|        bucket_name: Optional bucket name in request that caused the exception.
 | 
|        object_name: Optional object name in request that caused the exception.
 | 
|        generation: Optional generation in request that caused the exception.
 | 
| +      not_found_exception: Optional exception to raise in the not-found case.
 | 
|  
 | 
|      Raises:
 | 
|        Translated CloudApi exception, or the original exception if it was not
 | 
| @@ -1397,14 +1430,14 @@ class BotoTranslation(CloudApi):
 | 
|      """
 | 
|      translated_exception = self._TranslateBotoException(
 | 
|          e, bucket_name=bucket_name, object_name=object_name,
 | 
| -        generation=generation)
 | 
| +        generation=generation, not_found_exception=not_found_exception)
 | 
|      if translated_exception:
 | 
|        raise translated_exception
 | 
|      else:
 | 
|        raise
 | 
|  
 | 
|    def _TranslateBotoException(self, e, bucket_name=None, object_name=None,
 | 
| -                              generation=None):
 | 
| +                              generation=None, not_found_exception=None):
 | 
|      """Translates boto exceptions into their gsutil Cloud API equivalents.
 | 
|  
 | 
|      Args:
 | 
| @@ -1412,6 +1445,7 @@ class BotoTranslation(CloudApi):
 | 
|        bucket_name: Optional bucket name in request that caused the exception.
 | 
|        object_name: Optional object name in request that caused the exception.
 | 
|        generation: Optional generation in request that caused the exception.
 | 
| +      not_found_exception: Optional exception to raise in the not-found case.
 | 
|  
 | 
|      Returns:
 | 
|        CloudStorageApiServiceException for translatable exceptions, None
 | 
| @@ -1425,14 +1459,20 @@ class BotoTranslation(CloudApi):
 | 
|        elif e.status == 401 or e.status == 403:
 | 
|          return AccessDeniedException(e.code, status=e.status, body=e.body)
 | 
|        elif e.status == 404:
 | 
| -        if bucket_name:
 | 
| +        if not_found_exception:
 | 
| +          # The exception is pre-constructed prior to translation; the HTTP
 | 
| +          # status code isn't available at that time.
 | 
| +          setattr(not_found_exception, 'status', e.status)
 | 
| +          return not_found_exception
 | 
| +        elif bucket_name:
 | 
|            if object_name:
 | 
|              return CreateObjectNotFoundException(e.status, self.provider,
 | 
|                                                   bucket_name, object_name,
 | 
|                                                   generation=generation)
 | 
|            return CreateBucketNotFoundException(e.status, self.provider,
 | 
|                                                 bucket_name)
 | 
| -        return NotFoundException(e.code, status=e.status, body=e.body)
 | 
| +        return NotFoundException(e.message, status=e.status, body=e.body)
 | 
| +
 | 
|        elif e.status == 409 and e.code and 'BucketNotEmpty' in e.code:
 | 
|          return NotEmptyException('BucketNotEmpty (%s)' % bucket_name,
 | 
|                                   status=e.status, body=e.body)
 | 
| 
 |