Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(94)

Side by Side Diff: third_party/gsutil/gslib/resumable_streaming_upload.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « third_party/gsutil/gslib/project_id.py ('k') | third_party/gsutil/gslib/sig_handling.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2014 Google Inc. All Rights Reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Helper class for streaming resumable uploads."""
15
16 import collections
17 import os
18
19 from gslib.exception import CommandException
20 from gslib.util import GetJsonResumableChunkSize
21
22
23 class ResumableStreamingJsonUploadWrapper(object):
24 """Wraps an input stream in a buffer for resumable uploads.
25
26 This class takes a non-seekable input stream, buffers it, and exposes it
27 as a stream with limited seek capabilities such that it can be used in a
28 resumable JSON API upload.
29
30 max_buffer_size bytes of buffering is supported.
31 """
32
33 def __init__(self, stream, max_buffer_size, test_small_buffer=False):
34 """Initializes the wrapper.
35
36 Args:
37 stream: Input stream.
38 max_buffer_size: Maximum size of internal buffer; should be >= the chunk
39 size of the resumable upload API to ensure that at least one full
40 chunk write can be replayed in the event of a server error.
41 test_small_buffer: Skip check for buffer size vs. chunk size, for testing.
42 """
43 self._orig_fp = stream
44
45 if not test_small_buffer and max_buffer_size < GetJsonResumableChunkSize():
46 raise CommandException('Resumable streaming upload created with buffer '
47 'size %s, JSON resumable upload chunk size %s. '
48 'Buffer size must be >= JSON resumable upload '
49 'chunk size to ensure that uploads can be '
50 'resumed.' % (max_buffer_size,
51 GetJsonResumableChunkSize()))
52
53 self._max_buffer_size = max_buffer_size
54 self._buffer = collections.deque()
55 self._buffer_start = 0
56 self._buffer_end = 0
57 self._position = 0
58
59 def read(self, size=-1): # pylint: disable=invalid-name
60 """"Reads from the wrapped stream.
61
62 Args:
63 size: The amount of bytes to read. If omitted or negative, the entire
64 contents of the stream will be read and returned.
65
66 Returns:
67 Bytes from the wrapped stream.
68 """
69 read_all_bytes = size is None or size < 0
70 if read_all_bytes:
71 bytes_remaining = self._max_buffer_size
72 else:
73 bytes_remaining = size
74 data = b''
75 buffered_data = []
76 if self._position < self._buffer_end:
77 # There was a backwards seek, so read from the buffer first.
78
79 # TODO: Performance test to validate if it is worth re-aligning
80 # the buffers in this case. Also, seeking through the buffer for
81 # each read on a long catch-up is probably not performant, but we'd
82 # need a more complex data structure than a deque to get around this.
83 pos_in_buffer = self._buffer_start
84 buffer_index = 0
85 # First, find the start position in the buffer.
86 while pos_in_buffer + len(self._buffer[buffer_index]) < self._position:
87 # When this loop exits, buffer_index will refer to a buffer that
88 # has at least some overlap with self._position, and
89 # pos_in_buffer will be >= self._position
90 pos_in_buffer += len(self._buffer[buffer_index])
91 buffer_index += 1
92
93 # Read until we've read enough or we're out of buffer.
94 while pos_in_buffer < self._buffer_end and bytes_remaining > 0:
95 buffer_len = len(self._buffer[buffer_index])
96 # This describes how far into the current buffer self._position is.
97 offset_from_position = self._position - pos_in_buffer
98 bytes_available_this_buffer = buffer_len - offset_from_position
99 read_size = min(bytes_available_this_buffer, bytes_remaining)
100 buffered_data.append(
101 self._buffer[buffer_index]
102 [offset_from_position:offset_from_position + read_size])
103 bytes_remaining -= read_size
104 pos_in_buffer += buffer_len
105 buffer_index += 1
106 self._position += read_size
107
108 # At this point we're guaranteed that if there are any bytes left to read,
109 # then self._position == self._buffer_end, and we can read from the
110 # wrapped stream if needed.
111 if read_all_bytes:
112 # TODO: The user is requesting reading until the end of an
113 # arbitrary length stream, which is bad we'll need to return data
114 # with no size limits; if the stream is sufficiently long, we could run
115 # out of memory. We could break this down into smaller reads and
116 # buffer it as we go, but we're still left returning the data all at
117 # once to the caller. We could raise, but for now trust the caller to
118 # be sane and have enough memory to hold the remaining stream contents.
119 new_data = self._orig_fp.read(size)
120 data_len = len(new_data)
121 if not buffered_data:
122 data = new_data
123 else:
124 buffered_data.append(new_data)
125 data = b''.join(buffered_data)
126 self._position += data_len
127 elif bytes_remaining:
128 new_data = self._orig_fp.read(bytes_remaining)
129 if not buffered_data:
130 data = new_data
131 else:
132 buffered_data.append(new_data)
133 data = b''.join(buffered_data)
134 data_len = len(new_data)
135 if data_len:
136 self._position += data_len
137 self._buffer.append(new_data)
138 self._buffer_end += data_len
139 oldest_data = None
140 while self._buffer_end - self._buffer_start > self._max_buffer_size:
141 oldest_data = self._buffer.popleft()
142 self._buffer_start += len(oldest_data)
143 if oldest_data:
144 refill_amount = self._max_buffer_size - (self._buffer_end -
145 self._buffer_start)
146 if refill_amount:
147 self._buffer.appendleft(oldest_data[-refill_amount:])
148 self._buffer_start -= refill_amount
149 else:
150 data = b''.join(buffered_data) if buffered_data else b''
151
152 return data
153
154 def tell(self): # pylint: disable=invalid-name
155 """Returns the current stream position."""
156 return self._position
157
158 def seekable(self): # pylint: disable=invalid-name
159 """Returns true since limited seek support exists."""
160 return True
161
162 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name
163 """Seeks on the buffered stream.
164
165 Args:
166 offset: The offset to seek to; must be within the buffer bounds.
167 whence: Must be os.SEEK_SET.
168
169 Raises:
170 CommandException if an unsupported seek mode or position is used.
171 """
172 if whence == os.SEEK_SET:
173 if offset < self._buffer_start or offset > self._buffer_end:
174 raise CommandException('Unable to resume upload because of limited '
175 'buffering available for streaming uploads. '
176 'Offset %s was requested, but only data from '
177 '%s to %s is buffered.' %
178 (offset, self._buffer_start, self._buffer_end))
179 # Move to a position within the buffer.
180 self._position = offset
181 elif whence == os.SEEK_END:
182 if offset > self._max_buffer_size:
183 raise CommandException('Invalid SEEK_END offset %s on streaming '
184 'upload. Only %s can be buffered.' %
185 (offset, self._max_buffer_size))
186 # Read to the end and rely on buffering to handle the offset.
187 while self.read(self._max_buffer_size):
188 pass
189 # Now we're at the end.
190 self._position -= offset
191 else:
192 raise CommandException('Invalid seek mode on streaming upload. '
193 '(mode %s, offset %s)' % (whence, offset))
194
195 def close(self): # pylint: disable=invalid-name
196 return self._orig_fp.close()
OLDNEW
« no previous file with comments | « third_party/gsutil/gslib/project_id.py ('k') | third_party/gsutil/gslib/sig_handling.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698