Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1816)

Unified Diff: third_party/gsutil/gslib/resumable_streaming_upload.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « third_party/gsutil/gslib/project_id.py ('k') | third_party/gsutil/gslib/sig_handling.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: third_party/gsutil/gslib/resumable_streaming_upload.py
diff --git a/third_party/gsutil/gslib/resumable_streaming_upload.py b/third_party/gsutil/gslib/resumable_streaming_upload.py
new file mode 100644
index 0000000000000000000000000000000000000000..a32787d3abe710bd057e9cc03852ee53a45cbd01
--- /dev/null
+++ b/third_party/gsutil/gslib/resumable_streaming_upload.py
@@ -0,0 +1,196 @@
+# Copyright 2014 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Helper class for streaming resumable uploads."""
+
+import collections
+import os
+
+from gslib.exception import CommandException
+from gslib.util import GetJsonResumableChunkSize
+
+
+class ResumableStreamingJsonUploadWrapper(object):
+ """Wraps an input stream in a buffer for resumable uploads.
+
+ This class takes a non-seekable input stream, buffers it, and exposes it
+ as a stream with limited seek capabilities such that it can be used in a
+ resumable JSON API upload.
+
+ max_buffer_size bytes of buffering is supported.
+ """
+
+ def __init__(self, stream, max_buffer_size, test_small_buffer=False):
+ """Initializes the wrapper.
+
+ Args:
+ stream: Input stream.
+ max_buffer_size: Maximum size of internal buffer; should be >= the chunk
+ size of the resumable upload API to ensure that at least one full
+ chunk write can be replayed in the event of a server error.
+ test_small_buffer: Skip check for buffer size vs. chunk size, for testing.
+ """
+ self._orig_fp = stream
+
+ if not test_small_buffer and max_buffer_size < GetJsonResumableChunkSize():
+ raise CommandException('Resumable streaming upload created with buffer '
+ 'size %s, JSON resumable upload chunk size %s. '
+ 'Buffer size must be >= JSON resumable upload '
+ 'chunk size to ensure that uploads can be '
+ 'resumed.' % (max_buffer_size,
+ GetJsonResumableChunkSize()))
+
+ self._max_buffer_size = max_buffer_size
+ self._buffer = collections.deque()
+ self._buffer_start = 0
+ self._buffer_end = 0
+ self._position = 0
+
+ def read(self, size=-1): # pylint: disable=invalid-name
+ """"Reads from the wrapped stream.
+
+ Args:
+ size: The amount of bytes to read. If omitted or negative, the entire
+ contents of the stream will be read and returned.
+
+ Returns:
+ Bytes from the wrapped stream.
+ """
+ read_all_bytes = size is None or size < 0
+ if read_all_bytes:
+ bytes_remaining = self._max_buffer_size
+ else:
+ bytes_remaining = size
+ data = b''
+ buffered_data = []
+ if self._position < self._buffer_end:
+ # There was a backwards seek, so read from the buffer first.
+
+ # TODO: Performance test to validate if it is worth re-aligning
+ # the buffers in this case. Also, seeking through the buffer for
+ # each read on a long catch-up is probably not performant, but we'd
+ # need a more complex data structure than a deque to get around this.
+ pos_in_buffer = self._buffer_start
+ buffer_index = 0
+ # First, find the start position in the buffer.
+ while pos_in_buffer + len(self._buffer[buffer_index]) < self._position:
+ # When this loop exits, buffer_index will refer to a buffer that
+ # has at least some overlap with self._position, and
+ # pos_in_buffer will be >= self._position
+ pos_in_buffer += len(self._buffer[buffer_index])
+ buffer_index += 1
+
+ # Read until we've read enough or we're out of buffer.
+ while pos_in_buffer < self._buffer_end and bytes_remaining > 0:
+ buffer_len = len(self._buffer[buffer_index])
+ # This describes how far into the current buffer self._position is.
+ offset_from_position = self._position - pos_in_buffer
+ bytes_available_this_buffer = buffer_len - offset_from_position
+ read_size = min(bytes_available_this_buffer, bytes_remaining)
+ buffered_data.append(
+ self._buffer[buffer_index]
+ [offset_from_position:offset_from_position + read_size])
+ bytes_remaining -= read_size
+ pos_in_buffer += buffer_len
+ buffer_index += 1
+ self._position += read_size
+
+ # At this point we're guaranteed that if there are any bytes left to read,
+ # then self._position == self._buffer_end, and we can read from the
+ # wrapped stream if needed.
+ if read_all_bytes:
+ # TODO: The user is requesting reading until the end of an
+ # arbitrary length stream, which is bad we'll need to return data
+ # with no size limits; if the stream is sufficiently long, we could run
+ # out of memory. We could break this down into smaller reads and
+ # buffer it as we go, but we're still left returning the data all at
+ # once to the caller. We could raise, but for now trust the caller to
+ # be sane and have enough memory to hold the remaining stream contents.
+ new_data = self._orig_fp.read(size)
+ data_len = len(new_data)
+ if not buffered_data:
+ data = new_data
+ else:
+ buffered_data.append(new_data)
+ data = b''.join(buffered_data)
+ self._position += data_len
+ elif bytes_remaining:
+ new_data = self._orig_fp.read(bytes_remaining)
+ if not buffered_data:
+ data = new_data
+ else:
+ buffered_data.append(new_data)
+ data = b''.join(buffered_data)
+ data_len = len(new_data)
+ if data_len:
+ self._position += data_len
+ self._buffer.append(new_data)
+ self._buffer_end += data_len
+ oldest_data = None
+ while self._buffer_end - self._buffer_start > self._max_buffer_size:
+ oldest_data = self._buffer.popleft()
+ self._buffer_start += len(oldest_data)
+ if oldest_data:
+ refill_amount = self._max_buffer_size - (self._buffer_end -
+ self._buffer_start)
+ if refill_amount:
+ self._buffer.appendleft(oldest_data[-refill_amount:])
+ self._buffer_start -= refill_amount
+ else:
+ data = b''.join(buffered_data) if buffered_data else b''
+
+ return data
+
+ def tell(self): # pylint: disable=invalid-name
+ """Returns the current stream position."""
+ return self._position
+
+ def seekable(self): # pylint: disable=invalid-name
+ """Returns true since limited seek support exists."""
+ return True
+
+ def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name
+ """Seeks on the buffered stream.
+
+ Args:
+ offset: The offset to seek to; must be within the buffer bounds.
+ whence: Must be os.SEEK_SET.
+
+ Raises:
+ CommandException if an unsupported seek mode or position is used.
+ """
+ if whence == os.SEEK_SET:
+ if offset < self._buffer_start or offset > self._buffer_end:
+ raise CommandException('Unable to resume upload because of limited '
+ 'buffering available for streaming uploads. '
+ 'Offset %s was requested, but only data from '
+ '%s to %s is buffered.' %
+ (offset, self._buffer_start, self._buffer_end))
+ # Move to a position within the buffer.
+ self._position = offset
+ elif whence == os.SEEK_END:
+ if offset > self._max_buffer_size:
+ raise CommandException('Invalid SEEK_END offset %s on streaming '
+ 'upload. Only %s can be buffered.' %
+ (offset, self._max_buffer_size))
+ # Read to the end and rely on buffering to handle the offset.
+ while self.read(self._max_buffer_size):
+ pass
+ # Now we're at the end.
+ self._position -= offset
+ else:
+ raise CommandException('Invalid seek mode on streaming upload. '
+ '(mode %s, offset %s)' % (whence, offset))
+
+ def close(self): # pylint: disable=invalid-name
+ return self._orig_fp.close()
« no previous file with comments | « third_party/gsutil/gslib/project_id.py ('k') | third_party/gsutil/gslib/sig_handling.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698