Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(212)

Unified Diff: tools/telemetry/third_party/gsutil/gslib/daisy_chain_wrapper.py

Issue 1260493004: Revert "Add gsutil 4.13 to telemetry/third_party" (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/gsutil/gslib/daisy_chain_wrapper.py
diff --git a/tools/telemetry/third_party/gsutil/gslib/daisy_chain_wrapper.py b/tools/telemetry/third_party/gsutil/gslib/daisy_chain_wrapper.py
deleted file mode 100644
index 4e5717df8077b8fe1e8e89d58468da3e082c1eb6..0000000000000000000000000000000000000000
--- a/tools/telemetry/third_party/gsutil/gslib/daisy_chain_wrapper.py
+++ /dev/null
@@ -1,282 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Wrapper for use in daisy-chained copies."""
-
-from collections import deque
-import os
-import threading
-import time
-
-from gslib.cloud_api import BadRequestException
-from gslib.cloud_api import CloudApi
-from gslib.util import CreateLock
-from gslib.util import TRANSFER_BUFFER_SIZE
-
-
-# This controls the amount of bytes downloaded per download request.
-# We do not buffer this many bytes in memory at a time - that is controlled by
-# DaisyChainWrapper.max_buffer_size. This is the upper bound of bytes that may
-# be unnecessarily downloaded if there is a break in the resumable upload.
-_DEFAULT_DOWNLOAD_CHUNK_SIZE = 1024*1024*100
-
-
-class BufferWrapper(object):
- """Wraps the download file pointer to use our in-memory buffer."""
-
- def __init__(self, daisy_chain_wrapper):
- """Provides a buffered write interface for a file download.
-
- Args:
- daisy_chain_wrapper: DaisyChainWrapper instance to use for buffer and
- locking.
- """
- self.daisy_chain_wrapper = daisy_chain_wrapper
-
- def write(self, data): # pylint: disable=invalid-name
- """Waits for space in the buffer, then writes data to the buffer."""
- while True:
- with self.daisy_chain_wrapper.lock:
- if (self.daisy_chain_wrapper.bytes_buffered <
- self.daisy_chain_wrapper.max_buffer_size):
- break
- # Buffer was full, yield thread priority so the upload can pull from it.
- time.sleep(0)
- data_len = len(data)
- if data_len:
- with self.daisy_chain_wrapper.lock:
- self.daisy_chain_wrapper.buffer.append(data)
- self.daisy_chain_wrapper.bytes_buffered += data_len
-
-
-class DaisyChainWrapper(object):
- """Wrapper class for daisy-chaining a cloud download to an upload.
-
- This class instantiates a BufferWrapper object to buffer the download into
- memory, consuming a maximum of max_buffer_size. It implements intelligent
- behavior around read and seek that allow for all of the operations necessary
- to copy a file.
-
- This class is coupled with the XML and JSON implementations in that it
- expects that small buffers (maximum of TRANSFER_BUFFER_SIZE) in size will be
- used.
- """
-
- def __init__(self, src_url, src_obj_size, gsutil_api, progress_callback=None,
- download_chunk_size=_DEFAULT_DOWNLOAD_CHUNK_SIZE):
- """Initializes the daisy chain wrapper.
-
- Args:
- src_url: Source CloudUrl to copy from.
- src_obj_size: Size of source object.
- gsutil_api: gsutil Cloud API to use for the copy.
- progress_callback: Optional callback function for progress notifications
- for the download thread. Receives calls with arguments
- (bytes_transferred, total_size).
- download_chunk_size: Integer number of bytes to download per
- GetObjectMedia request. This is the upper bound of bytes that may be
- unnecessarily downloaded if there is a break in the resumable upload.
-
- """
- # Current read position for the upload file pointer.
- self.position = 0
- self.buffer = deque()
-
- self.bytes_buffered = 0
- # Maximum amount of bytes in memory at a time.
- self.max_buffer_size = 1024 * 1024 # 1 MiB
-
- self._download_chunk_size = download_chunk_size
-
- # We save one buffer's worth of data as a special case for boto,
- # which seeks back one buffer and rereads to compute hashes. This is
- # unnecessary because we can just compare cloud hash digests at the end,
- # but it allows this to work without modfiying boto.
- self.last_position = 0
- self.last_data = None
-
- # Protects buffer, position, bytes_buffered, last_position, and last_data.
- self.lock = CreateLock()
-
- # Protects download_exception.
- self.download_exception_lock = CreateLock()
-
- self.src_obj_size = src_obj_size
- self.src_url = src_url
-
- # This is safe to use the upload and download thread because the download
- # thread calls only GetObjectMedia, which creates a new HTTP connection
- # independent of gsutil_api. Thus, it will not share an HTTP connection
- # with the upload.
- self.gsutil_api = gsutil_api
-
- # If self.download_thread dies due to an exception, it is saved here so
- # that it can also be raised in the upload thread.
- self.download_exception = None
- self.download_thread = None
- self.progress_callback = progress_callback
- self.stop_download = threading.Event()
- self.StartDownloadThread(progress_callback=self.progress_callback)
-
- def StartDownloadThread(self, start_byte=0, progress_callback=None):
- """Starts the download thread for the source object (from start_byte)."""
-
- def PerformDownload(start_byte, progress_callback):
- """Downloads the source object in chunks.
-
- This function checks the stop_download event and exits early if it is set.
- It should be set when there is an error during the daisy-chain upload,
- then this function can be called again with the upload's current position
- as start_byte.
-
- Args:
- start_byte: Byte from which to begin the download.
- progress_callback: Optional callback function for progress
- notifications. Receives calls with arguments
- (bytes_transferred, total_size).
- """
- # TODO: Support resumable downloads. This would require the BufferWrapper
- # object to support seek() and tell() which requires coordination with
- # the upload.
- try:
- while start_byte + self._download_chunk_size < self.src_obj_size:
- self.gsutil_api.GetObjectMedia(
- self.src_url.bucket_name, self.src_url.object_name,
- BufferWrapper(self), start_byte=start_byte,
- end_byte=start_byte + self._download_chunk_size - 1,
- generation=self.src_url.generation, object_size=self.src_obj_size,
- download_strategy=CloudApi.DownloadStrategy.ONE_SHOT,
- provider=self.src_url.scheme, progress_callback=progress_callback)
- if self.stop_download.is_set():
- # Download thread needs to be restarted, so exit.
- self.stop_download.clear()
- return
- start_byte += self._download_chunk_size
- self.gsutil_api.GetObjectMedia(
- self.src_url.bucket_name, self.src_url.object_name,
- BufferWrapper(self), start_byte=start_byte,
- generation=self.src_url.generation, object_size=self.src_obj_size,
- download_strategy=CloudApi.DownloadStrategy.ONE_SHOT,
- provider=self.src_url.scheme, progress_callback=progress_callback)
- # We catch all exceptions here because we want to store them.
- except Exception, e: # pylint: disable=broad-except
- # Save the exception so that it can be seen in the upload thread.
- with self.download_exception_lock:
- self.download_exception = e
- raise
-
- # TODO: If we do gzip encoding transforms mid-transfer, this will fail.
- self.download_thread = threading.Thread(
- target=PerformDownload,
- args=(start_byte, progress_callback))
- self.download_thread.start()
-
- def read(self, amt=None): # pylint: disable=invalid-name
- """Exposes a stream from the in-memory buffer to the upload."""
- if self.position == self.src_obj_size or amt == 0:
- # If there is no data left or 0 bytes were requested, return an empty
- # string so callers can call still call len() and read(0).
- return ''
- if amt is None or amt > TRANSFER_BUFFER_SIZE:
- raise BadRequestException(
- 'Invalid HTTP read size %s during daisy chain operation, '
- 'expected <= %s.' % (amt, TRANSFER_BUFFER_SIZE))
-
- while True:
- with self.lock:
- if self.buffer:
- break
- with self.download_exception_lock:
- if self.download_exception:
- # Download thread died, so we will never recover. Raise the
- # exception that killed it.
- raise self.download_exception # pylint: disable=raising-bad-type
- # Buffer was empty, yield thread priority so the download thread can fill.
- time.sleep(0)
- with self.lock:
- # TODO: Need to handle the caller requesting less than a
- # transfer_buffer_size worth of data.
- data = self.buffer.popleft()
- self.last_position = self.position
- self.last_data = data
- data_len = len(data)
- self.position += data_len
- self.bytes_buffered -= data_len
- if data_len > amt:
- raise BadRequestException(
- 'Invalid read during daisy chain operation, got data of size '
- '%s, expected size %s.' % (data_len, amt))
- return data
-
- def tell(self): # pylint: disable=invalid-name
- with self.lock:
- return self.position
-
- def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name
- restart_download = False
- if whence == os.SEEK_END:
- if offset:
- raise IOError(
- 'Invalid seek during daisy chain operation. Non-zero offset %s '
- 'from os.SEEK_END is not supported' % offset)
- with self.lock:
- self.last_position = self.position
- self.last_data = None
- # Safe because we check position against src_obj_size in read.
- self.position = self.src_obj_size
- elif whence == os.SEEK_SET:
- with self.lock:
- if offset == self.position:
- pass
- elif offset == self.last_position:
- self.position = self.last_position
- if self.last_data:
- # If we seek to end and then back, we won't have last_data; we'll
- # get it on the next call to read.
- self.buffer.appendleft(self.last_data)
- self.bytes_buffered += len(self.last_data)
- else:
- # Once a download is complete, boto seeks to 0 and re-reads to
- # compute the hash if an md5 isn't already present (for example a GCS
- # composite object), so we have to re-download the whole object.
- # Also, when daisy-chaining to a resumable upload, on error the
- # service may have received any number of the bytes; the download
- # needs to be restarted from that point.
- restart_download = True
-
- if restart_download:
- self.stop_download.set()
-
- # Consume any remaining bytes in the download thread so that
- # the thread can exit, then restart the thread at the desired position.
- while self.download_thread.is_alive():
- with self.lock:
- while self.bytes_buffered:
- self.bytes_buffered -= len(self.buffer.popleft())
- time.sleep(0)
-
- with self.lock:
- self.position = offset
- self.buffer = deque()
- self.bytes_buffered = 0
- self.last_position = 0
- self.last_data = None
- self.StartDownloadThread(start_byte=offset,
- progress_callback=self.progress_callback)
- else:
- raise IOError('Daisy-chain download wrapper does not support '
- 'seek mode %s' % whence)
-
- def seekable(self): # pylint: disable=invalid-name
- return True
« no previous file with comments | « tools/telemetry/third_party/gsutil/gslib/cs_api_map.py ('k') | tools/telemetry/third_party/gsutil/gslib/data/cacerts.txt » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698