Index: third_party/gsutil/gslib/resumable_streaming_upload.py |
diff --git a/third_party/gsutil/gslib/resumable_streaming_upload.py b/third_party/gsutil/gslib/resumable_streaming_upload.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..a32787d3abe710bd057e9cc03852ee53a45cbd01 |
--- /dev/null |
+++ b/third_party/gsutil/gslib/resumable_streaming_upload.py |
@@ -0,0 +1,196 @@ |
+# Copyright 2014 Google Inc. All Rights Reserved. |
+# |
+# Licensed under the Apache License, Version 2.0 (the "License"); |
+# you may not use this file except in compliance with the License. |
+# You may obtain a copy of the License at |
+# |
+# http://www.apache.org/licenses/LICENSE-2.0 |
+# |
+# Unless required by applicable law or agreed to in writing, software |
+# distributed under the License is distributed on an "AS IS" BASIS, |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+# See the License for the specific language governing permissions and |
+# limitations under the License. |
+"""Helper class for streaming resumable uploads.""" |
+ |
+import collections |
+import os |
+ |
+from gslib.exception import CommandException |
+from gslib.util import GetJsonResumableChunkSize |
+ |
+ |
+class ResumableStreamingJsonUploadWrapper(object): |
+ """Wraps an input stream in a buffer for resumable uploads. |
+ |
+ This class takes a non-seekable input stream, buffers it, and exposes it |
+ as a stream with limited seek capabilities such that it can be used in a |
+ resumable JSON API upload. |
+ |
+ max_buffer_size bytes of buffering is supported. |
+ """ |
+ |
+ def __init__(self, stream, max_buffer_size, test_small_buffer=False): |
+ """Initializes the wrapper. |
+ |
+ Args: |
+ stream: Input stream. |
+ max_buffer_size: Maximum size of internal buffer; should be >= the chunk |
+ size of the resumable upload API to ensure that at least one full |
+ chunk write can be replayed in the event of a server error. |
+ test_small_buffer: Skip check for buffer size vs. chunk size, for testing. |
+ """ |
+ self._orig_fp = stream |
+ |
+ if not test_small_buffer and max_buffer_size < GetJsonResumableChunkSize(): |
+ raise CommandException('Resumable streaming upload created with buffer ' |
+ 'size %s, JSON resumable upload chunk size %s. ' |
+ 'Buffer size must be >= JSON resumable upload ' |
+ 'chunk size to ensure that uploads can be ' |
+ 'resumed.' % (max_buffer_size, |
+ GetJsonResumableChunkSize())) |
+ |
+ self._max_buffer_size = max_buffer_size |
+ self._buffer = collections.deque() |
+ self._buffer_start = 0 |
+ self._buffer_end = 0 |
+ self._position = 0 |
+ |
+ def read(self, size=-1): # pylint: disable=invalid-name |
+ """"Reads from the wrapped stream. |
+ |
+ Args: |
+ size: The amount of bytes to read. If omitted or negative, the entire |
+ contents of the stream will be read and returned. |
+ |
+ Returns: |
+ Bytes from the wrapped stream. |
+ """ |
+ read_all_bytes = size is None or size < 0 |
+ if read_all_bytes: |
+ bytes_remaining = self._max_buffer_size |
+ else: |
+ bytes_remaining = size |
+ data = b'' |
+ buffered_data = [] |
+ if self._position < self._buffer_end: |
+ # There was a backwards seek, so read from the buffer first. |
+ |
+ # TODO: Performance test to validate if it is worth re-aligning |
+ # the buffers in this case. Also, seeking through the buffer for |
+ # each read on a long catch-up is probably not performant, but we'd |
+ # need a more complex data structure than a deque to get around this. |
+ pos_in_buffer = self._buffer_start |
+ buffer_index = 0 |
+ # First, find the start position in the buffer. |
+ while pos_in_buffer + len(self._buffer[buffer_index]) < self._position: |
+ # When this loop exits, buffer_index will refer to a buffer that |
+ # has at least some overlap with self._position, and |
+ # pos_in_buffer will be >= self._position |
+ pos_in_buffer += len(self._buffer[buffer_index]) |
+ buffer_index += 1 |
+ |
+ # Read until we've read enough or we're out of buffer. |
+ while pos_in_buffer < self._buffer_end and bytes_remaining > 0: |
+ buffer_len = len(self._buffer[buffer_index]) |
+ # This describes how far into the current buffer self._position is. |
+ offset_from_position = self._position - pos_in_buffer |
+ bytes_available_this_buffer = buffer_len - offset_from_position |
+ read_size = min(bytes_available_this_buffer, bytes_remaining) |
+ buffered_data.append( |
+ self._buffer[buffer_index] |
+ [offset_from_position:offset_from_position + read_size]) |
+ bytes_remaining -= read_size |
+ pos_in_buffer += buffer_len |
+ buffer_index += 1 |
+ self._position += read_size |
+ |
+ # At this point we're guaranteed that if there are any bytes left to read, |
+ # then self._position == self._buffer_end, and we can read from the |
+ # wrapped stream if needed. |
+ if read_all_bytes: |
+ # TODO: The user is requesting reading until the end of an |
+ # arbitrary length stream, which is bad we'll need to return data |
+ # with no size limits; if the stream is sufficiently long, we could run |
+ # out of memory. We could break this down into smaller reads and |
+ # buffer it as we go, but we're still left returning the data all at |
+ # once to the caller. We could raise, but for now trust the caller to |
+ # be sane and have enough memory to hold the remaining stream contents. |
+ new_data = self._orig_fp.read(size) |
+ data_len = len(new_data) |
+ if not buffered_data: |
+ data = new_data |
+ else: |
+ buffered_data.append(new_data) |
+ data = b''.join(buffered_data) |
+ self._position += data_len |
+ elif bytes_remaining: |
+ new_data = self._orig_fp.read(bytes_remaining) |
+ if not buffered_data: |
+ data = new_data |
+ else: |
+ buffered_data.append(new_data) |
+ data = b''.join(buffered_data) |
+ data_len = len(new_data) |
+ if data_len: |
+ self._position += data_len |
+ self._buffer.append(new_data) |
+ self._buffer_end += data_len |
+ oldest_data = None |
+ while self._buffer_end - self._buffer_start > self._max_buffer_size: |
+ oldest_data = self._buffer.popleft() |
+ self._buffer_start += len(oldest_data) |
+ if oldest_data: |
+ refill_amount = self._max_buffer_size - (self._buffer_end - |
+ self._buffer_start) |
+ if refill_amount: |
+ self._buffer.appendleft(oldest_data[-refill_amount:]) |
+ self._buffer_start -= refill_amount |
+ else: |
+ data = b''.join(buffered_data) if buffered_data else b'' |
+ |
+ return data |
+ |
+ def tell(self): # pylint: disable=invalid-name |
+ """Returns the current stream position.""" |
+ return self._position |
+ |
+ def seekable(self): # pylint: disable=invalid-name |
+ """Returns true since limited seek support exists.""" |
+ return True |
+ |
+ def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name |
+ """Seeks on the buffered stream. |
+ |
+ Args: |
+ offset: The offset to seek to; must be within the buffer bounds. |
+ whence: Must be os.SEEK_SET. |
+ |
+ Raises: |
+ CommandException if an unsupported seek mode or position is used. |
+ """ |
+ if whence == os.SEEK_SET: |
+ if offset < self._buffer_start or offset > self._buffer_end: |
+ raise CommandException('Unable to resume upload because of limited ' |
+ 'buffering available for streaming uploads. ' |
+ 'Offset %s was requested, but only data from ' |
+ '%s to %s is buffered.' % |
+ (offset, self._buffer_start, self._buffer_end)) |
+ # Move to a position within the buffer. |
+ self._position = offset |
+ elif whence == os.SEEK_END: |
+ if offset > self._max_buffer_size: |
+ raise CommandException('Invalid SEEK_END offset %s on streaming ' |
+ 'upload. Only %s can be buffered.' % |
+ (offset, self._max_buffer_size)) |
+ # Read to the end and rely on buffering to handle the offset. |
+ while self.read(self._max_buffer_size): |
+ pass |
+ # Now we're at the end. |
+ self._position -= offset |
+ else: |
+ raise CommandException('Invalid seek mode on streaming upload. ' |
+ '(mode %s, offset %s)' % (whence, offset)) |
+ |
+ def close(self): # pylint: disable=invalid-name |
+ return self._orig_fp.close() |