Index: tools/telemetry/third_party/gsutil/gslib/resumable_streaming_upload.py |
diff --git a/tools/telemetry/third_party/gsutil/gslib/resumable_streaming_upload.py b/tools/telemetry/third_party/gsutil/gslib/resumable_streaming_upload.py |
deleted file mode 100644 |
index a32787d3abe710bd057e9cc03852ee53a45cbd01..0000000000000000000000000000000000000000 |
--- a/tools/telemetry/third_party/gsutil/gslib/resumable_streaming_upload.py |
+++ /dev/null |
@@ -1,196 +0,0 @@ |
-# Copyright 2014 Google Inc. All Rights Reserved. |
-# |
-# Licensed under the Apache License, Version 2.0 (the "License"); |
-# you may not use this file except in compliance with the License. |
-# You may obtain a copy of the License at |
-# |
-# http://www.apache.org/licenses/LICENSE-2.0 |
-# |
-# Unless required by applicable law or agreed to in writing, software |
-# distributed under the License is distributed on an "AS IS" BASIS, |
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
-# See the License for the specific language governing permissions and |
-# limitations under the License. |
-"""Helper class for streaming resumable uploads.""" |
- |
-import collections |
-import os |
- |
-from gslib.exception import CommandException |
-from gslib.util import GetJsonResumableChunkSize |
- |
- |
-class ResumableStreamingJsonUploadWrapper(object): |
- """Wraps an input stream in a buffer for resumable uploads. |
- |
- This class takes a non-seekable input stream, buffers it, and exposes it |
- as a stream with limited seek capabilities such that it can be used in a |
- resumable JSON API upload. |
- |
- max_buffer_size bytes of buffering is supported. |
- """ |
- |
- def __init__(self, stream, max_buffer_size, test_small_buffer=False): |
- """Initializes the wrapper. |
- |
- Args: |
- stream: Input stream. |
- max_buffer_size: Maximum size of internal buffer; should be >= the chunk |
- size of the resumable upload API to ensure that at least one full |
- chunk write can be replayed in the event of a server error. |
- test_small_buffer: Skip check for buffer size vs. chunk size, for testing. |
- """ |
- self._orig_fp = stream |
- |
- if not test_small_buffer and max_buffer_size < GetJsonResumableChunkSize(): |
- raise CommandException('Resumable streaming upload created with buffer ' |
- 'size %s, JSON resumable upload chunk size %s. ' |
- 'Buffer size must be >= JSON resumable upload ' |
- 'chunk size to ensure that uploads can be ' |
- 'resumed.' % (max_buffer_size, |
- GetJsonResumableChunkSize())) |
- |
- self._max_buffer_size = max_buffer_size |
- self._buffer = collections.deque() |
- self._buffer_start = 0 |
- self._buffer_end = 0 |
- self._position = 0 |
- |
- def read(self, size=-1): # pylint: disable=invalid-name |
- """"Reads from the wrapped stream. |
- |
- Args: |
- size: The amount of bytes to read. If omitted or negative, the entire |
- contents of the stream will be read and returned. |
- |
- Returns: |
- Bytes from the wrapped stream. |
- """ |
- read_all_bytes = size is None or size < 0 |
- if read_all_bytes: |
- bytes_remaining = self._max_buffer_size |
- else: |
- bytes_remaining = size |
- data = b'' |
- buffered_data = [] |
- if self._position < self._buffer_end: |
- # There was a backwards seek, so read from the buffer first. |
- |
- # TODO: Performance test to validate if it is worth re-aligning |
- # the buffers in this case. Also, seeking through the buffer for |
- # each read on a long catch-up is probably not performant, but we'd |
- # need a more complex data structure than a deque to get around this. |
- pos_in_buffer = self._buffer_start |
- buffer_index = 0 |
- # First, find the start position in the buffer. |
- while pos_in_buffer + len(self._buffer[buffer_index]) < self._position: |
- # When this loop exits, buffer_index will refer to a buffer that |
- # has at least some overlap with self._position, and |
- # pos_in_buffer will be >= self._position |
- pos_in_buffer += len(self._buffer[buffer_index]) |
- buffer_index += 1 |
- |
- # Read until we've read enough or we're out of buffer. |
- while pos_in_buffer < self._buffer_end and bytes_remaining > 0: |
- buffer_len = len(self._buffer[buffer_index]) |
- # This describes how far into the current buffer self._position is. |
- offset_from_position = self._position - pos_in_buffer |
- bytes_available_this_buffer = buffer_len - offset_from_position |
- read_size = min(bytes_available_this_buffer, bytes_remaining) |
- buffered_data.append( |
- self._buffer[buffer_index] |
- [offset_from_position:offset_from_position + read_size]) |
- bytes_remaining -= read_size |
- pos_in_buffer += buffer_len |
- buffer_index += 1 |
- self._position += read_size |
- |
- # At this point we're guaranteed that if there are any bytes left to read, |
- # then self._position == self._buffer_end, and we can read from the |
- # wrapped stream if needed. |
- if read_all_bytes: |
- # TODO: The user is requesting reading until the end of an |
- # arbitrary length stream, which is bad we'll need to return data |
- # with no size limits; if the stream is sufficiently long, we could run |
- # out of memory. We could break this down into smaller reads and |
- # buffer it as we go, but we're still left returning the data all at |
- # once to the caller. We could raise, but for now trust the caller to |
- # be sane and have enough memory to hold the remaining stream contents. |
- new_data = self._orig_fp.read(size) |
- data_len = len(new_data) |
- if not buffered_data: |
- data = new_data |
- else: |
- buffered_data.append(new_data) |
- data = b''.join(buffered_data) |
- self._position += data_len |
- elif bytes_remaining: |
- new_data = self._orig_fp.read(bytes_remaining) |
- if not buffered_data: |
- data = new_data |
- else: |
- buffered_data.append(new_data) |
- data = b''.join(buffered_data) |
- data_len = len(new_data) |
- if data_len: |
- self._position += data_len |
- self._buffer.append(new_data) |
- self._buffer_end += data_len |
- oldest_data = None |
- while self._buffer_end - self._buffer_start > self._max_buffer_size: |
- oldest_data = self._buffer.popleft() |
- self._buffer_start += len(oldest_data) |
- if oldest_data: |
- refill_amount = self._max_buffer_size - (self._buffer_end - |
- self._buffer_start) |
- if refill_amount: |
- self._buffer.appendleft(oldest_data[-refill_amount:]) |
- self._buffer_start -= refill_amount |
- else: |
- data = b''.join(buffered_data) if buffered_data else b'' |
- |
- return data |
- |
- def tell(self): # pylint: disable=invalid-name |
- """Returns the current stream position.""" |
- return self._position |
- |
- def seekable(self): # pylint: disable=invalid-name |
- """Returns true since limited seek support exists.""" |
- return True |
- |
- def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name |
- """Seeks on the buffered stream. |
- |
- Args: |
- offset: The offset to seek to; must be within the buffer bounds. |
- whence: Must be os.SEEK_SET. |
- |
- Raises: |
- CommandException if an unsupported seek mode or position is used. |
- """ |
- if whence == os.SEEK_SET: |
- if offset < self._buffer_start or offset > self._buffer_end: |
- raise CommandException('Unable to resume upload because of limited ' |
- 'buffering available for streaming uploads. ' |
- 'Offset %s was requested, but only data from ' |
- '%s to %s is buffered.' % |
- (offset, self._buffer_start, self._buffer_end)) |
- # Move to a position within the buffer. |
- self._position = offset |
- elif whence == os.SEEK_END: |
- if offset > self._max_buffer_size: |
- raise CommandException('Invalid SEEK_END offset %s on streaming ' |
- 'upload. Only %s can be buffered.' % |
- (offset, self._max_buffer_size)) |
- # Read to the end and rely on buffering to handle the offset. |
- while self.read(self._max_buffer_size): |
- pass |
- # Now we're at the end. |
- self._position -= offset |
- else: |
- raise CommandException('Invalid seek mode on streaming upload. ' |
- '(mode %s, offset %s)' % (whence, offset)) |
- |
- def close(self): # pylint: disable=invalid-name |
- return self._orig_fp.close() |