Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(11)

Unified Diff: third_party/gsutil/gslib/daisy_chain_wrapper.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « third_party/gsutil/gslib/cs_api_map.py ('k') | third_party/gsutil/gslib/data/cacerts.txt » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: third_party/gsutil/gslib/daisy_chain_wrapper.py
diff --git a/third_party/gsutil/gslib/daisy_chain_wrapper.py b/third_party/gsutil/gslib/daisy_chain_wrapper.py
new file mode 100644
index 0000000000000000000000000000000000000000..4e5717df8077b8fe1e8e89d58468da3e082c1eb6
--- /dev/null
+++ b/third_party/gsutil/gslib/daisy_chain_wrapper.py
@@ -0,0 +1,282 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Wrapper for use in daisy-chained copies."""
+
+from collections import deque
+import os
+import threading
+import time
+
+from gslib.cloud_api import BadRequestException
+from gslib.cloud_api import CloudApi
+from gslib.util import CreateLock
+from gslib.util import TRANSFER_BUFFER_SIZE
+
+
+# This controls the amount of bytes downloaded per download request.
+# We do not buffer this many bytes in memory at a time - that is controlled by
+# DaisyChainWrapper.max_buffer_size. This is the upper bound of bytes that may
+# be unnecessarily downloaded if there is a break in the resumable upload.
+_DEFAULT_DOWNLOAD_CHUNK_SIZE = 1024*1024*100
+
+
+class BufferWrapper(object):
+ """Wraps the download file pointer to use our in-memory buffer."""
+
+ def __init__(self, daisy_chain_wrapper):
+ """Provides a buffered write interface for a file download.
+
+ Args:
+ daisy_chain_wrapper: DaisyChainWrapper instance to use for buffer and
+ locking.
+ """
+ self.daisy_chain_wrapper = daisy_chain_wrapper
+
+ def write(self, data): # pylint: disable=invalid-name
+ """Waits for space in the buffer, then writes data to the buffer."""
+ while True:
+ with self.daisy_chain_wrapper.lock:
+ if (self.daisy_chain_wrapper.bytes_buffered <
+ self.daisy_chain_wrapper.max_buffer_size):
+ break
+ # Buffer was full, yield thread priority so the upload can pull from it.
+ time.sleep(0)
+ data_len = len(data)
+ if data_len:
+ with self.daisy_chain_wrapper.lock:
+ self.daisy_chain_wrapper.buffer.append(data)
+ self.daisy_chain_wrapper.bytes_buffered += data_len
+
+
+class DaisyChainWrapper(object):
+ """Wrapper class for daisy-chaining a cloud download to an upload.
+
+ This class instantiates a BufferWrapper object to buffer the download into
+ memory, consuming a maximum of max_buffer_size. It implements intelligent
+ behavior around read and seek that allow for all of the operations necessary
+ to copy a file.
+
+ This class is coupled with the XML and JSON implementations in that it
+ expects that small buffers (maximum of TRANSFER_BUFFER_SIZE) in size will be
+ used.
+ """
+
+ def __init__(self, src_url, src_obj_size, gsutil_api, progress_callback=None,
+ download_chunk_size=_DEFAULT_DOWNLOAD_CHUNK_SIZE):
+ """Initializes the daisy chain wrapper.
+
+ Args:
+ src_url: Source CloudUrl to copy from.
+ src_obj_size: Size of source object.
+ gsutil_api: gsutil Cloud API to use for the copy.
+ progress_callback: Optional callback function for progress notifications
+ for the download thread. Receives calls with arguments
+ (bytes_transferred, total_size).
+ download_chunk_size: Integer number of bytes to download per
+ GetObjectMedia request. This is the upper bound of bytes that may be
+ unnecessarily downloaded if there is a break in the resumable upload.
+
+ """
+ # Current read position for the upload file pointer.
+ self.position = 0
+ self.buffer = deque()
+
+ self.bytes_buffered = 0
+ # Maximum amount of bytes in memory at a time.
+ self.max_buffer_size = 1024 * 1024 # 1 MiB
+
+ self._download_chunk_size = download_chunk_size
+
+ # We save one buffer's worth of data as a special case for boto,
+ # which seeks back one buffer and rereads to compute hashes. This is
+ # unnecessary because we can just compare cloud hash digests at the end,
+ # but it allows this to work without modfiying boto.
+ self.last_position = 0
+ self.last_data = None
+
+ # Protects buffer, position, bytes_buffered, last_position, and last_data.
+ self.lock = CreateLock()
+
+ # Protects download_exception.
+ self.download_exception_lock = CreateLock()
+
+ self.src_obj_size = src_obj_size
+ self.src_url = src_url
+
+ # This is safe to use the upload and download thread because the download
+ # thread calls only GetObjectMedia, which creates a new HTTP connection
+ # independent of gsutil_api. Thus, it will not share an HTTP connection
+ # with the upload.
+ self.gsutil_api = gsutil_api
+
+ # If self.download_thread dies due to an exception, it is saved here so
+ # that it can also be raised in the upload thread.
+ self.download_exception = None
+ self.download_thread = None
+ self.progress_callback = progress_callback
+ self.stop_download = threading.Event()
+ self.StartDownloadThread(progress_callback=self.progress_callback)
+
+ def StartDownloadThread(self, start_byte=0, progress_callback=None):
+ """Starts the download thread for the source object (from start_byte)."""
+
+ def PerformDownload(start_byte, progress_callback):
+ """Downloads the source object in chunks.
+
+ This function checks the stop_download event and exits early if it is set.
+ It should be set when there is an error during the daisy-chain upload,
+ then this function can be called again with the upload's current position
+ as start_byte.
+
+ Args:
+ start_byte: Byte from which to begin the download.
+ progress_callback: Optional callback function for progress
+ notifications. Receives calls with arguments
+ (bytes_transferred, total_size).
+ """
+ # TODO: Support resumable downloads. This would require the BufferWrapper
+ # object to support seek() and tell() which requires coordination with
+ # the upload.
+ try:
+ while start_byte + self._download_chunk_size < self.src_obj_size:
+ self.gsutil_api.GetObjectMedia(
+ self.src_url.bucket_name, self.src_url.object_name,
+ BufferWrapper(self), start_byte=start_byte,
+ end_byte=start_byte + self._download_chunk_size - 1,
+ generation=self.src_url.generation, object_size=self.src_obj_size,
+ download_strategy=CloudApi.DownloadStrategy.ONE_SHOT,
+ provider=self.src_url.scheme, progress_callback=progress_callback)
+ if self.stop_download.is_set():
+ # Download thread needs to be restarted, so exit.
+ self.stop_download.clear()
+ return
+ start_byte += self._download_chunk_size
+ self.gsutil_api.GetObjectMedia(
+ self.src_url.bucket_name, self.src_url.object_name,
+ BufferWrapper(self), start_byte=start_byte,
+ generation=self.src_url.generation, object_size=self.src_obj_size,
+ download_strategy=CloudApi.DownloadStrategy.ONE_SHOT,
+ provider=self.src_url.scheme, progress_callback=progress_callback)
+ # We catch all exceptions here because we want to store them.
+ except Exception, e: # pylint: disable=broad-except
+ # Save the exception so that it can be seen in the upload thread.
+ with self.download_exception_lock:
+ self.download_exception = e
+ raise
+
+ # TODO: If we do gzip encoding transforms mid-transfer, this will fail.
+ self.download_thread = threading.Thread(
+ target=PerformDownload,
+ args=(start_byte, progress_callback))
+ self.download_thread.start()
+
+ def read(self, amt=None): # pylint: disable=invalid-name
+ """Exposes a stream from the in-memory buffer to the upload."""
+ if self.position == self.src_obj_size or amt == 0:
+ # If there is no data left or 0 bytes were requested, return an empty
+ # string so callers can call still call len() and read(0).
+ return ''
+ if amt is None or amt > TRANSFER_BUFFER_SIZE:
+ raise BadRequestException(
+ 'Invalid HTTP read size %s during daisy chain operation, '
+ 'expected <= %s.' % (amt, TRANSFER_BUFFER_SIZE))
+
+ while True:
+ with self.lock:
+ if self.buffer:
+ break
+ with self.download_exception_lock:
+ if self.download_exception:
+ # Download thread died, so we will never recover. Raise the
+ # exception that killed it.
+ raise self.download_exception # pylint: disable=raising-bad-type
+ # Buffer was empty, yield thread priority so the download thread can fill.
+ time.sleep(0)
+ with self.lock:
+ # TODO: Need to handle the caller requesting less than a
+ # transfer_buffer_size worth of data.
+ data = self.buffer.popleft()
+ self.last_position = self.position
+ self.last_data = data
+ data_len = len(data)
+ self.position += data_len
+ self.bytes_buffered -= data_len
+ if data_len > amt:
+ raise BadRequestException(
+ 'Invalid read during daisy chain operation, got data of size '
+ '%s, expected size %s.' % (data_len, amt))
+ return data
+
+ def tell(self): # pylint: disable=invalid-name
+ with self.lock:
+ return self.position
+
+ def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name
+ restart_download = False
+ if whence == os.SEEK_END:
+ if offset:
+ raise IOError(
+ 'Invalid seek during daisy chain operation. Non-zero offset %s '
+ 'from os.SEEK_END is not supported' % offset)
+ with self.lock:
+ self.last_position = self.position
+ self.last_data = None
+ # Safe because we check position against src_obj_size in read.
+ self.position = self.src_obj_size
+ elif whence == os.SEEK_SET:
+ with self.lock:
+ if offset == self.position:
+ pass
+ elif offset == self.last_position:
+ self.position = self.last_position
+ if self.last_data:
+ # If we seek to end and then back, we won't have last_data; we'll
+ # get it on the next call to read.
+ self.buffer.appendleft(self.last_data)
+ self.bytes_buffered += len(self.last_data)
+ else:
+ # Once a download is complete, boto seeks to 0 and re-reads to
+ # compute the hash if an md5 isn't already present (for example a GCS
+ # composite object), so we have to re-download the whole object.
+ # Also, when daisy-chaining to a resumable upload, on error the
+ # service may have received any number of the bytes; the download
+ # needs to be restarted from that point.
+ restart_download = True
+
+ if restart_download:
+ self.stop_download.set()
+
+ # Consume any remaining bytes in the download thread so that
+ # the thread can exit, then restart the thread at the desired position.
+ while self.download_thread.is_alive():
+ with self.lock:
+ while self.bytes_buffered:
+ self.bytes_buffered -= len(self.buffer.popleft())
+ time.sleep(0)
+
+ with self.lock:
+ self.position = offset
+ self.buffer = deque()
+ self.bytes_buffered = 0
+ self.last_position = 0
+ self.last_data = None
+ self.StartDownloadThread(start_byte=offset,
+ progress_callback=self.progress_callback)
+ else:
+ raise IOError('Daisy-chain download wrapper does not support '
+ 'seek mode %s' % whence)
+
+ def seekable(self): # pylint: disable=invalid-name
+ return True
« no previous file with comments | « third_party/gsutil/gslib/cs_api_map.py ('k') | third_party/gsutil/gslib/data/cacerts.txt » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698