OLD | NEW |
(Empty) | |
| 1 # Copyright 2014 Google Inc. All Rights Reserved. |
| 2 # |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 # you may not use this file except in compliance with the License. |
| 5 # You may obtain a copy of the License at |
| 6 # |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 # |
| 9 # Unless required by applicable law or agreed to in writing, software |
| 10 # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 # See the License for the specific language governing permissions and |
| 13 # limitations under the License. |
| 14 """Helper class for streaming resumable uploads.""" |
| 15 |
| 16 import collections |
| 17 import os |
| 18 |
| 19 from gslib.exception import CommandException |
| 20 from gslib.util import GetJsonResumableChunkSize |
| 21 |
| 22 |
| 23 class ResumableStreamingJsonUploadWrapper(object): |
| 24 """Wraps an input stream in a buffer for resumable uploads. |
| 25 |
| 26 This class takes a non-seekable input stream, buffers it, and exposes it |
| 27 as a stream with limited seek capabilities such that it can be used in a |
| 28 resumable JSON API upload. |
| 29 |
| 30 max_buffer_size bytes of buffering is supported. |
| 31 """ |
| 32 |
| 33 def __init__(self, stream, max_buffer_size, test_small_buffer=False): |
| 34 """Initializes the wrapper. |
| 35 |
| 36 Args: |
| 37 stream: Input stream. |
| 38 max_buffer_size: Maximum size of internal buffer; should be >= the chunk |
| 39 size of the resumable upload API to ensure that at least one full |
| 40 chunk write can be replayed in the event of a server error. |
| 41 test_small_buffer: Skip check for buffer size vs. chunk size, for testing. |
| 42 """ |
| 43 self._orig_fp = stream |
| 44 |
| 45 if not test_small_buffer and max_buffer_size < GetJsonResumableChunkSize(): |
| 46 raise CommandException('Resumable streaming upload created with buffer ' |
| 47 'size %s, JSON resumable upload chunk size %s. ' |
| 48 'Buffer size must be >= JSON resumable upload ' |
| 49 'chunk size to ensure that uploads can be ' |
| 50 'resumed.' % (max_buffer_size, |
| 51 GetJsonResumableChunkSize())) |
| 52 |
| 53 self._max_buffer_size = max_buffer_size |
| 54 self._buffer = collections.deque() |
| 55 self._buffer_start = 0 |
| 56 self._buffer_end = 0 |
| 57 self._position = 0 |
| 58 |
| 59 def read(self, size=-1): # pylint: disable=invalid-name |
| 60 """"Reads from the wrapped stream. |
| 61 |
| 62 Args: |
| 63 size: The amount of bytes to read. If omitted or negative, the entire |
| 64 contents of the stream will be read and returned. |
| 65 |
| 66 Returns: |
| 67 Bytes from the wrapped stream. |
| 68 """ |
| 69 read_all_bytes = size is None or size < 0 |
| 70 if read_all_bytes: |
| 71 bytes_remaining = self._max_buffer_size |
| 72 else: |
| 73 bytes_remaining = size |
| 74 data = b'' |
| 75 buffered_data = [] |
| 76 if self._position < self._buffer_end: |
| 77 # There was a backwards seek, so read from the buffer first. |
| 78 |
| 79 # TODO: Performance test to validate if it is worth re-aligning |
| 80 # the buffers in this case. Also, seeking through the buffer for |
| 81 # each read on a long catch-up is probably not performant, but we'd |
| 82 # need a more complex data structure than a deque to get around this. |
| 83 pos_in_buffer = self._buffer_start |
| 84 buffer_index = 0 |
| 85 # First, find the start position in the buffer. |
| 86 while pos_in_buffer + len(self._buffer[buffer_index]) < self._position: |
| 87 # When this loop exits, buffer_index will refer to a buffer that |
| 88 # has at least some overlap with self._position, and |
| 89 # pos_in_buffer will be >= self._position |
| 90 pos_in_buffer += len(self._buffer[buffer_index]) |
| 91 buffer_index += 1 |
| 92 |
| 93 # Read until we've read enough or we're out of buffer. |
| 94 while pos_in_buffer < self._buffer_end and bytes_remaining > 0: |
| 95 buffer_len = len(self._buffer[buffer_index]) |
| 96 # This describes how far into the current buffer self._position is. |
| 97 offset_from_position = self._position - pos_in_buffer |
| 98 bytes_available_this_buffer = buffer_len - offset_from_position |
| 99 read_size = min(bytes_available_this_buffer, bytes_remaining) |
| 100 buffered_data.append( |
| 101 self._buffer[buffer_index] |
| 102 [offset_from_position:offset_from_position + read_size]) |
| 103 bytes_remaining -= read_size |
| 104 pos_in_buffer += buffer_len |
| 105 buffer_index += 1 |
| 106 self._position += read_size |
| 107 |
| 108 # At this point we're guaranteed that if there are any bytes left to read, |
| 109 # then self._position == self._buffer_end, and we can read from the |
| 110 # wrapped stream if needed. |
| 111 if read_all_bytes: |
| 112 # TODO: The user is requesting reading until the end of an |
| 113 # arbitrary length stream, which is bad we'll need to return data |
| 114 # with no size limits; if the stream is sufficiently long, we could run |
| 115 # out of memory. We could break this down into smaller reads and |
| 116 # buffer it as we go, but we're still left returning the data all at |
| 117 # once to the caller. We could raise, but for now trust the caller to |
| 118 # be sane and have enough memory to hold the remaining stream contents. |
| 119 new_data = self._orig_fp.read(size) |
| 120 data_len = len(new_data) |
| 121 if not buffered_data: |
| 122 data = new_data |
| 123 else: |
| 124 buffered_data.append(new_data) |
| 125 data = b''.join(buffered_data) |
| 126 self._position += data_len |
| 127 elif bytes_remaining: |
| 128 new_data = self._orig_fp.read(bytes_remaining) |
| 129 if not buffered_data: |
| 130 data = new_data |
| 131 else: |
| 132 buffered_data.append(new_data) |
| 133 data = b''.join(buffered_data) |
| 134 data_len = len(new_data) |
| 135 if data_len: |
| 136 self._position += data_len |
| 137 self._buffer.append(new_data) |
| 138 self._buffer_end += data_len |
| 139 oldest_data = None |
| 140 while self._buffer_end - self._buffer_start > self._max_buffer_size: |
| 141 oldest_data = self._buffer.popleft() |
| 142 self._buffer_start += len(oldest_data) |
| 143 if oldest_data: |
| 144 refill_amount = self._max_buffer_size - (self._buffer_end - |
| 145 self._buffer_start) |
| 146 if refill_amount: |
| 147 self._buffer.appendleft(oldest_data[-refill_amount:]) |
| 148 self._buffer_start -= refill_amount |
| 149 else: |
| 150 data = b''.join(buffered_data) if buffered_data else b'' |
| 151 |
| 152 return data |
| 153 |
| 154 def tell(self): # pylint: disable=invalid-name |
| 155 """Returns the current stream position.""" |
| 156 return self._position |
| 157 |
| 158 def seekable(self): # pylint: disable=invalid-name |
| 159 """Returns true since limited seek support exists.""" |
| 160 return True |
| 161 |
| 162 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name |
| 163 """Seeks on the buffered stream. |
| 164 |
| 165 Args: |
| 166 offset: The offset to seek to; must be within the buffer bounds. |
| 167 whence: Must be os.SEEK_SET. |
| 168 |
| 169 Raises: |
| 170 CommandException if an unsupported seek mode or position is used. |
| 171 """ |
| 172 if whence == os.SEEK_SET: |
| 173 if offset < self._buffer_start or offset > self._buffer_end: |
| 174 raise CommandException('Unable to resume upload because of limited ' |
| 175 'buffering available for streaming uploads. ' |
| 176 'Offset %s was requested, but only data from ' |
| 177 '%s to %s is buffered.' % |
| 178 (offset, self._buffer_start, self._buffer_end)) |
| 179 # Move to a position within the buffer. |
| 180 self._position = offset |
| 181 elif whence == os.SEEK_END: |
| 182 if offset > self._max_buffer_size: |
| 183 raise CommandException('Invalid SEEK_END offset %s on streaming ' |
| 184 'upload. Only %s can be buffered.' % |
| 185 (offset, self._max_buffer_size)) |
| 186 # Read to the end and rely on buffering to handle the offset. |
| 187 while self.read(self._max_buffer_size): |
| 188 pass |
| 189 # Now we're at the end. |
| 190 self._position -= offset |
| 191 else: |
| 192 raise CommandException('Invalid seek mode on streaming upload. ' |
| 193 '(mode %s, offset %s)' % (whence, offset)) |
| 194 |
| 195 def close(self): # pylint: disable=invalid-name |
| 196 return self._orig_fp.close() |
OLD | NEW |