OLD | NEW |
| (Empty) |
1 # Copyright 2014 Google Inc. All Rights Reserved. | |
2 # | |
3 # Licensed under the Apache License, Version 2.0 (the "License"); | |
4 # you may not use this file except in compliance with the License. | |
5 # You may obtain a copy of the License at | |
6 # | |
7 # http://www.apache.org/licenses/LICENSE-2.0 | |
8 # | |
9 # Unless required by applicable law or agreed to in writing, software | |
10 # distributed under the License is distributed on an "AS IS" BASIS, | |
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 # See the License for the specific language governing permissions and | |
13 # limitations under the License. | |
14 """Helper class for streaming resumable uploads.""" | |
15 | |
16 import collections | |
17 import os | |
18 | |
19 from gslib.exception import CommandException | |
20 from gslib.util import GetJsonResumableChunkSize | |
21 | |
22 | |
23 class ResumableStreamingJsonUploadWrapper(object): | |
24 """Wraps an input stream in a buffer for resumable uploads. | |
25 | |
26 This class takes a non-seekable input stream, buffers it, and exposes it | |
27 as a stream with limited seek capabilities such that it can be used in a | |
28 resumable JSON API upload. | |
29 | |
30 max_buffer_size bytes of buffering is supported. | |
31 """ | |
32 | |
33 def __init__(self, stream, max_buffer_size, test_small_buffer=False): | |
34 """Initializes the wrapper. | |
35 | |
36 Args: | |
37 stream: Input stream. | |
38 max_buffer_size: Maximum size of internal buffer; should be >= the chunk | |
39 size of the resumable upload API to ensure that at least one full | |
40 chunk write can be replayed in the event of a server error. | |
41 test_small_buffer: Skip check for buffer size vs. chunk size, for testing. | |
42 """ | |
43 self._orig_fp = stream | |
44 | |
45 if not test_small_buffer and max_buffer_size < GetJsonResumableChunkSize(): | |
46 raise CommandException('Resumable streaming upload created with buffer ' | |
47 'size %s, JSON resumable upload chunk size %s. ' | |
48 'Buffer size must be >= JSON resumable upload ' | |
49 'chunk size to ensure that uploads can be ' | |
50 'resumed.' % (max_buffer_size, | |
51 GetJsonResumableChunkSize())) | |
52 | |
53 self._max_buffer_size = max_buffer_size | |
54 self._buffer = collections.deque() | |
55 self._buffer_start = 0 | |
56 self._buffer_end = 0 | |
57 self._position = 0 | |
58 | |
59 def read(self, size=-1): # pylint: disable=invalid-name | |
60 """"Reads from the wrapped stream. | |
61 | |
62 Args: | |
63 size: The amount of bytes to read. If omitted or negative, the entire | |
64 contents of the stream will be read and returned. | |
65 | |
66 Returns: | |
67 Bytes from the wrapped stream. | |
68 """ | |
69 read_all_bytes = size is None or size < 0 | |
70 if read_all_bytes: | |
71 bytes_remaining = self._max_buffer_size | |
72 else: | |
73 bytes_remaining = size | |
74 data = b'' | |
75 buffered_data = [] | |
76 if self._position < self._buffer_end: | |
77 # There was a backwards seek, so read from the buffer first. | |
78 | |
79 # TODO: Performance test to validate if it is worth re-aligning | |
80 # the buffers in this case. Also, seeking through the buffer for | |
81 # each read on a long catch-up is probably not performant, but we'd | |
82 # need a more complex data structure than a deque to get around this. | |
83 pos_in_buffer = self._buffer_start | |
84 buffer_index = 0 | |
85 # First, find the start position in the buffer. | |
86 while pos_in_buffer + len(self._buffer[buffer_index]) < self._position: | |
87 # When this loop exits, buffer_index will refer to a buffer that | |
88 # has at least some overlap with self._position, and | |
89 # pos_in_buffer will be >= self._position | |
90 pos_in_buffer += len(self._buffer[buffer_index]) | |
91 buffer_index += 1 | |
92 | |
93 # Read until we've read enough or we're out of buffer. | |
94 while pos_in_buffer < self._buffer_end and bytes_remaining > 0: | |
95 buffer_len = len(self._buffer[buffer_index]) | |
96 # This describes how far into the current buffer self._position is. | |
97 offset_from_position = self._position - pos_in_buffer | |
98 bytes_available_this_buffer = buffer_len - offset_from_position | |
99 read_size = min(bytes_available_this_buffer, bytes_remaining) | |
100 buffered_data.append( | |
101 self._buffer[buffer_index] | |
102 [offset_from_position:offset_from_position + read_size]) | |
103 bytes_remaining -= read_size | |
104 pos_in_buffer += buffer_len | |
105 buffer_index += 1 | |
106 self._position += read_size | |
107 | |
108 # At this point we're guaranteed that if there are any bytes left to read, | |
109 # then self._position == self._buffer_end, and we can read from the | |
110 # wrapped stream if needed. | |
111 if read_all_bytes: | |
112 # TODO: The user is requesting reading until the end of an | |
113 # arbitrary length stream, which is bad we'll need to return data | |
114 # with no size limits; if the stream is sufficiently long, we could run | |
115 # out of memory. We could break this down into smaller reads and | |
116 # buffer it as we go, but we're still left returning the data all at | |
117 # once to the caller. We could raise, but for now trust the caller to | |
118 # be sane and have enough memory to hold the remaining stream contents. | |
119 new_data = self._orig_fp.read(size) | |
120 data_len = len(new_data) | |
121 if not buffered_data: | |
122 data = new_data | |
123 else: | |
124 buffered_data.append(new_data) | |
125 data = b''.join(buffered_data) | |
126 self._position += data_len | |
127 elif bytes_remaining: | |
128 new_data = self._orig_fp.read(bytes_remaining) | |
129 if not buffered_data: | |
130 data = new_data | |
131 else: | |
132 buffered_data.append(new_data) | |
133 data = b''.join(buffered_data) | |
134 data_len = len(new_data) | |
135 if data_len: | |
136 self._position += data_len | |
137 self._buffer.append(new_data) | |
138 self._buffer_end += data_len | |
139 oldest_data = None | |
140 while self._buffer_end - self._buffer_start > self._max_buffer_size: | |
141 oldest_data = self._buffer.popleft() | |
142 self._buffer_start += len(oldest_data) | |
143 if oldest_data: | |
144 refill_amount = self._max_buffer_size - (self._buffer_end - | |
145 self._buffer_start) | |
146 if refill_amount: | |
147 self._buffer.appendleft(oldest_data[-refill_amount:]) | |
148 self._buffer_start -= refill_amount | |
149 else: | |
150 data = b''.join(buffered_data) if buffered_data else b'' | |
151 | |
152 return data | |
153 | |
154 def tell(self): # pylint: disable=invalid-name | |
155 """Returns the current stream position.""" | |
156 return self._position | |
157 | |
158 def seekable(self): # pylint: disable=invalid-name | |
159 """Returns true since limited seek support exists.""" | |
160 return True | |
161 | |
162 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name | |
163 """Seeks on the buffered stream. | |
164 | |
165 Args: | |
166 offset: The offset to seek to; must be within the buffer bounds. | |
167 whence: Must be os.SEEK_SET. | |
168 | |
169 Raises: | |
170 CommandException if an unsupported seek mode or position is used. | |
171 """ | |
172 if whence == os.SEEK_SET: | |
173 if offset < self._buffer_start or offset > self._buffer_end: | |
174 raise CommandException('Unable to resume upload because of limited ' | |
175 'buffering available for streaming uploads. ' | |
176 'Offset %s was requested, but only data from ' | |
177 '%s to %s is buffered.' % | |
178 (offset, self._buffer_start, self._buffer_end)) | |
179 # Move to a position within the buffer. | |
180 self._position = offset | |
181 elif whence == os.SEEK_END: | |
182 if offset > self._max_buffer_size: | |
183 raise CommandException('Invalid SEEK_END offset %s on streaming ' | |
184 'upload. Only %s can be buffered.' % | |
185 (offset, self._max_buffer_size)) | |
186 # Read to the end and rely on buffering to handle the offset. | |
187 while self.read(self._max_buffer_size): | |
188 pass | |
189 # Now we're at the end. | |
190 self._position -= offset | |
191 else: | |
192 raise CommandException('Invalid seek mode on streaming upload. ' | |
193 '(mode %s, offset %s)' % (whence, offset)) | |
194 | |
195 def close(self): # pylint: disable=invalid-name | |
196 return self._orig_fp.close() | |
OLD | NEW |