Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(208)

Unified Diff: tools/telemetry/third_party/gsutil/gslib/resumable_streaming_upload.py

Issue 1260493004: Revert "Add gsutil 4.13 to telemetry/third_party" (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/gsutil/gslib/resumable_streaming_upload.py
diff --git a/tools/telemetry/third_party/gsutil/gslib/resumable_streaming_upload.py b/tools/telemetry/third_party/gsutil/gslib/resumable_streaming_upload.py
deleted file mode 100644
index a32787d3abe710bd057e9cc03852ee53a45cbd01..0000000000000000000000000000000000000000
--- a/tools/telemetry/third_party/gsutil/gslib/resumable_streaming_upload.py
+++ /dev/null
@@ -1,196 +0,0 @@
-# Copyright 2014 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Helper class for streaming resumable uploads."""
-
-import collections
-import os
-
-from gslib.exception import CommandException
-from gslib.util import GetJsonResumableChunkSize
-
-
-class ResumableStreamingJsonUploadWrapper(object):
- """Wraps an input stream in a buffer for resumable uploads.
-
- This class takes a non-seekable input stream, buffers it, and exposes it
- as a stream with limited seek capabilities such that it can be used in a
- resumable JSON API upload.
-
- max_buffer_size bytes of buffering is supported.
- """
-
- def __init__(self, stream, max_buffer_size, test_small_buffer=False):
- """Initializes the wrapper.
-
- Args:
- stream: Input stream.
- max_buffer_size: Maximum size of internal buffer; should be >= the chunk
- size of the resumable upload API to ensure that at least one full
- chunk write can be replayed in the event of a server error.
- test_small_buffer: Skip check for buffer size vs. chunk size, for testing.
- """
- self._orig_fp = stream
-
- if not test_small_buffer and max_buffer_size < GetJsonResumableChunkSize():
- raise CommandException('Resumable streaming upload created with buffer '
- 'size %s, JSON resumable upload chunk size %s. '
- 'Buffer size must be >= JSON resumable upload '
- 'chunk size to ensure that uploads can be '
- 'resumed.' % (max_buffer_size,
- GetJsonResumableChunkSize()))
-
- self._max_buffer_size = max_buffer_size
- self._buffer = collections.deque()
- self._buffer_start = 0
- self._buffer_end = 0
- self._position = 0
-
- def read(self, size=-1): # pylint: disable=invalid-name
- """"Reads from the wrapped stream.
-
- Args:
- size: The amount of bytes to read. If omitted or negative, the entire
- contents of the stream will be read and returned.
-
- Returns:
- Bytes from the wrapped stream.
- """
- read_all_bytes = size is None or size < 0
- if read_all_bytes:
- bytes_remaining = self._max_buffer_size
- else:
- bytes_remaining = size
- data = b''
- buffered_data = []
- if self._position < self._buffer_end:
- # There was a backwards seek, so read from the buffer first.
-
- # TODO: Performance test to validate if it is worth re-aligning
- # the buffers in this case. Also, seeking through the buffer for
- # each read on a long catch-up is probably not performant, but we'd
- # need a more complex data structure than a deque to get around this.
- pos_in_buffer = self._buffer_start
- buffer_index = 0
- # First, find the start position in the buffer.
- while pos_in_buffer + len(self._buffer[buffer_index]) < self._position:
- # When this loop exits, buffer_index will refer to a buffer that
- # has at least some overlap with self._position, and
- # pos_in_buffer will be >= self._position
- pos_in_buffer += len(self._buffer[buffer_index])
- buffer_index += 1
-
- # Read until we've read enough or we're out of buffer.
- while pos_in_buffer < self._buffer_end and bytes_remaining > 0:
- buffer_len = len(self._buffer[buffer_index])
- # This describes how far into the current buffer self._position is.
- offset_from_position = self._position - pos_in_buffer
- bytes_available_this_buffer = buffer_len - offset_from_position
- read_size = min(bytes_available_this_buffer, bytes_remaining)
- buffered_data.append(
- self._buffer[buffer_index]
- [offset_from_position:offset_from_position + read_size])
- bytes_remaining -= read_size
- pos_in_buffer += buffer_len
- buffer_index += 1
- self._position += read_size
-
- # At this point we're guaranteed that if there are any bytes left to read,
- # then self._position == self._buffer_end, and we can read from the
- # wrapped stream if needed.
- if read_all_bytes:
- # TODO: The user is requesting reading until the end of an
- # arbitrary length stream, which is bad we'll need to return data
- # with no size limits; if the stream is sufficiently long, we could run
- # out of memory. We could break this down into smaller reads and
- # buffer it as we go, but we're still left returning the data all at
- # once to the caller. We could raise, but for now trust the caller to
- # be sane and have enough memory to hold the remaining stream contents.
- new_data = self._orig_fp.read(size)
- data_len = len(new_data)
- if not buffered_data:
- data = new_data
- else:
- buffered_data.append(new_data)
- data = b''.join(buffered_data)
- self._position += data_len
- elif bytes_remaining:
- new_data = self._orig_fp.read(bytes_remaining)
- if not buffered_data:
- data = new_data
- else:
- buffered_data.append(new_data)
- data = b''.join(buffered_data)
- data_len = len(new_data)
- if data_len:
- self._position += data_len
- self._buffer.append(new_data)
- self._buffer_end += data_len
- oldest_data = None
- while self._buffer_end - self._buffer_start > self._max_buffer_size:
- oldest_data = self._buffer.popleft()
- self._buffer_start += len(oldest_data)
- if oldest_data:
- refill_amount = self._max_buffer_size - (self._buffer_end -
- self._buffer_start)
- if refill_amount:
- self._buffer.appendleft(oldest_data[-refill_amount:])
- self._buffer_start -= refill_amount
- else:
- data = b''.join(buffered_data) if buffered_data else b''
-
- return data
-
- def tell(self): # pylint: disable=invalid-name
- """Returns the current stream position."""
- return self._position
-
- def seekable(self): # pylint: disable=invalid-name
- """Returns true since limited seek support exists."""
- return True
-
- def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name
- """Seeks on the buffered stream.
-
- Args:
- offset: The offset to seek to; must be within the buffer bounds.
- whence: Must be os.SEEK_SET.
-
- Raises:
- CommandException if an unsupported seek mode or position is used.
- """
- if whence == os.SEEK_SET:
- if offset < self._buffer_start or offset > self._buffer_end:
- raise CommandException('Unable to resume upload because of limited '
- 'buffering available for streaming uploads. '
- 'Offset %s was requested, but only data from '
- '%s to %s is buffered.' %
- (offset, self._buffer_start, self._buffer_end))
- # Move to a position within the buffer.
- self._position = offset
- elif whence == os.SEEK_END:
- if offset > self._max_buffer_size:
- raise CommandException('Invalid SEEK_END offset %s on streaming '
- 'upload. Only %s can be buffered.' %
- (offset, self._max_buffer_size))
- # Read to the end and rely on buffering to handle the offset.
- while self.read(self._max_buffer_size):
- pass
- # Now we're at the end.
- self._position -= offset
- else:
- raise CommandException('Invalid seek mode on streaming upload. '
- '(mode %s, offset %s)' % (whence, offset))
-
- def close(self): # pylint: disable=invalid-name
- return self._orig_fp.close()
« no previous file with comments | « tools/telemetry/third_party/gsutil/gslib/project_id.py ('k') | tools/telemetry/third_party/gsutil/gslib/sig_handling.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698