OLD | NEW |
| (Empty) |
1 # -*- coding: utf-8 -*- | |
2 # Copyright 2014 Google Inc. All Rights Reserved. | |
3 # | |
4 # Licensed under the Apache License, Version 2.0 (the "License"); | |
5 # you may not use this file except in compliance with the License. | |
6 # You may obtain a copy of the License at | |
7 # | |
8 # http://www.apache.org/licenses/LICENSE-2.0 | |
9 # | |
10 # Unless required by applicable law or agreed to in writing, software | |
11 # distributed under the License is distributed on an "AS IS" BASIS, | |
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 # See the License for the specific language governing permissions and | |
14 # limitations under the License. | |
15 """Wrapper for use in daisy-chained copies.""" | |
16 | |
17 from collections import deque | |
18 import os | |
19 import threading | |
20 import time | |
21 | |
22 from gslib.cloud_api import BadRequestException | |
23 from gslib.cloud_api import CloudApi | |
24 from gslib.util import CreateLock | |
25 from gslib.util import TRANSFER_BUFFER_SIZE | |
26 | |
27 | |
28 # This controls the amount of bytes downloaded per download request. | |
29 # We do not buffer this many bytes in memory at a time - that is controlled by | |
30 # DaisyChainWrapper.max_buffer_size. This is the upper bound of bytes that may | |
31 # be unnecessarily downloaded if there is a break in the resumable upload. | |
32 _DEFAULT_DOWNLOAD_CHUNK_SIZE = 1024*1024*100 | |
33 | |
34 | |
35 class BufferWrapper(object): | |
36 """Wraps the download file pointer to use our in-memory buffer.""" | |
37 | |
38 def __init__(self, daisy_chain_wrapper): | |
39 """Provides a buffered write interface for a file download. | |
40 | |
41 Args: | |
42 daisy_chain_wrapper: DaisyChainWrapper instance to use for buffer and | |
43 locking. | |
44 """ | |
45 self.daisy_chain_wrapper = daisy_chain_wrapper | |
46 | |
47 def write(self, data): # pylint: disable=invalid-name | |
48 """Waits for space in the buffer, then writes data to the buffer.""" | |
49 while True: | |
50 with self.daisy_chain_wrapper.lock: | |
51 if (self.daisy_chain_wrapper.bytes_buffered < | |
52 self.daisy_chain_wrapper.max_buffer_size): | |
53 break | |
54 # Buffer was full, yield thread priority so the upload can pull from it. | |
55 time.sleep(0) | |
56 data_len = len(data) | |
57 if data_len: | |
58 with self.daisy_chain_wrapper.lock: | |
59 self.daisy_chain_wrapper.buffer.append(data) | |
60 self.daisy_chain_wrapper.bytes_buffered += data_len | |
61 | |
62 | |
63 class DaisyChainWrapper(object): | |
64 """Wrapper class for daisy-chaining a cloud download to an upload. | |
65 | |
66 This class instantiates a BufferWrapper object to buffer the download into | |
67 memory, consuming a maximum of max_buffer_size. It implements intelligent | |
68 behavior around read and seek that allow for all of the operations necessary | |
69 to copy a file. | |
70 | |
71 This class is coupled with the XML and JSON implementations in that it | |
72 expects that small buffers (maximum of TRANSFER_BUFFER_SIZE) in size will be | |
73 used. | |
74 """ | |
75 | |
76 def __init__(self, src_url, src_obj_size, gsutil_api, progress_callback=None, | |
77 download_chunk_size=_DEFAULT_DOWNLOAD_CHUNK_SIZE): | |
78 """Initializes the daisy chain wrapper. | |
79 | |
80 Args: | |
81 src_url: Source CloudUrl to copy from. | |
82 src_obj_size: Size of source object. | |
83 gsutil_api: gsutil Cloud API to use for the copy. | |
84 progress_callback: Optional callback function for progress notifications | |
85 for the download thread. Receives calls with arguments | |
86 (bytes_transferred, total_size). | |
87 download_chunk_size: Integer number of bytes to download per | |
88 GetObjectMedia request. This is the upper bound of bytes that may be | |
89 unnecessarily downloaded if there is a break in the resumable upload. | |
90 | |
91 """ | |
92 # Current read position for the upload file pointer. | |
93 self.position = 0 | |
94 self.buffer = deque() | |
95 | |
96 self.bytes_buffered = 0 | |
97 # Maximum amount of bytes in memory at a time. | |
98 self.max_buffer_size = 1024 * 1024 # 1 MiB | |
99 | |
100 self._download_chunk_size = download_chunk_size | |
101 | |
102 # We save one buffer's worth of data as a special case for boto, | |
103 # which seeks back one buffer and rereads to compute hashes. This is | |
104 # unnecessary because we can just compare cloud hash digests at the end, | |
105 # but it allows this to work without modfiying boto. | |
106 self.last_position = 0 | |
107 self.last_data = None | |
108 | |
109 # Protects buffer, position, bytes_buffered, last_position, and last_data. | |
110 self.lock = CreateLock() | |
111 | |
112 # Protects download_exception. | |
113 self.download_exception_lock = CreateLock() | |
114 | |
115 self.src_obj_size = src_obj_size | |
116 self.src_url = src_url | |
117 | |
118 # This is safe to use the upload and download thread because the download | |
119 # thread calls only GetObjectMedia, which creates a new HTTP connection | |
120 # independent of gsutil_api. Thus, it will not share an HTTP connection | |
121 # with the upload. | |
122 self.gsutil_api = gsutil_api | |
123 | |
124 # If self.download_thread dies due to an exception, it is saved here so | |
125 # that it can also be raised in the upload thread. | |
126 self.download_exception = None | |
127 self.download_thread = None | |
128 self.progress_callback = progress_callback | |
129 self.stop_download = threading.Event() | |
130 self.StartDownloadThread(progress_callback=self.progress_callback) | |
131 | |
132 def StartDownloadThread(self, start_byte=0, progress_callback=None): | |
133 """Starts the download thread for the source object (from start_byte).""" | |
134 | |
135 def PerformDownload(start_byte, progress_callback): | |
136 """Downloads the source object in chunks. | |
137 | |
138 This function checks the stop_download event and exits early if it is set. | |
139 It should be set when there is an error during the daisy-chain upload, | |
140 then this function can be called again with the upload's current position | |
141 as start_byte. | |
142 | |
143 Args: | |
144 start_byte: Byte from which to begin the download. | |
145 progress_callback: Optional callback function for progress | |
146 notifications. Receives calls with arguments | |
147 (bytes_transferred, total_size). | |
148 """ | |
149 # TODO: Support resumable downloads. This would require the BufferWrapper | |
150 # object to support seek() and tell() which requires coordination with | |
151 # the upload. | |
152 try: | |
153 while start_byte + self._download_chunk_size < self.src_obj_size: | |
154 self.gsutil_api.GetObjectMedia( | |
155 self.src_url.bucket_name, self.src_url.object_name, | |
156 BufferWrapper(self), start_byte=start_byte, | |
157 end_byte=start_byte + self._download_chunk_size - 1, | |
158 generation=self.src_url.generation, object_size=self.src_obj_size, | |
159 download_strategy=CloudApi.DownloadStrategy.ONE_SHOT, | |
160 provider=self.src_url.scheme, progress_callback=progress_callback) | |
161 if self.stop_download.is_set(): | |
162 # Download thread needs to be restarted, so exit. | |
163 self.stop_download.clear() | |
164 return | |
165 start_byte += self._download_chunk_size | |
166 self.gsutil_api.GetObjectMedia( | |
167 self.src_url.bucket_name, self.src_url.object_name, | |
168 BufferWrapper(self), start_byte=start_byte, | |
169 generation=self.src_url.generation, object_size=self.src_obj_size, | |
170 download_strategy=CloudApi.DownloadStrategy.ONE_SHOT, | |
171 provider=self.src_url.scheme, progress_callback=progress_callback) | |
172 # We catch all exceptions here because we want to store them. | |
173 except Exception, e: # pylint: disable=broad-except | |
174 # Save the exception so that it can be seen in the upload thread. | |
175 with self.download_exception_lock: | |
176 self.download_exception = e | |
177 raise | |
178 | |
179 # TODO: If we do gzip encoding transforms mid-transfer, this will fail. | |
180 self.download_thread = threading.Thread( | |
181 target=PerformDownload, | |
182 args=(start_byte, progress_callback)) | |
183 self.download_thread.start() | |
184 | |
185 def read(self, amt=None): # pylint: disable=invalid-name | |
186 """Exposes a stream from the in-memory buffer to the upload.""" | |
187 if self.position == self.src_obj_size or amt == 0: | |
188 # If there is no data left or 0 bytes were requested, return an empty | |
189 # string so callers can call still call len() and read(0). | |
190 return '' | |
191 if amt is None or amt > TRANSFER_BUFFER_SIZE: | |
192 raise BadRequestException( | |
193 'Invalid HTTP read size %s during daisy chain operation, ' | |
194 'expected <= %s.' % (amt, TRANSFER_BUFFER_SIZE)) | |
195 | |
196 while True: | |
197 with self.lock: | |
198 if self.buffer: | |
199 break | |
200 with self.download_exception_lock: | |
201 if self.download_exception: | |
202 # Download thread died, so we will never recover. Raise the | |
203 # exception that killed it. | |
204 raise self.download_exception # pylint: disable=raising-bad-type | |
205 # Buffer was empty, yield thread priority so the download thread can fill. | |
206 time.sleep(0) | |
207 with self.lock: | |
208 # TODO: Need to handle the caller requesting less than a | |
209 # transfer_buffer_size worth of data. | |
210 data = self.buffer.popleft() | |
211 self.last_position = self.position | |
212 self.last_data = data | |
213 data_len = len(data) | |
214 self.position += data_len | |
215 self.bytes_buffered -= data_len | |
216 if data_len > amt: | |
217 raise BadRequestException( | |
218 'Invalid read during daisy chain operation, got data of size ' | |
219 '%s, expected size %s.' % (data_len, amt)) | |
220 return data | |
221 | |
222 def tell(self): # pylint: disable=invalid-name | |
223 with self.lock: | |
224 return self.position | |
225 | |
226 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name | |
227 restart_download = False | |
228 if whence == os.SEEK_END: | |
229 if offset: | |
230 raise IOError( | |
231 'Invalid seek during daisy chain operation. Non-zero offset %s ' | |
232 'from os.SEEK_END is not supported' % offset) | |
233 with self.lock: | |
234 self.last_position = self.position | |
235 self.last_data = None | |
236 # Safe because we check position against src_obj_size in read. | |
237 self.position = self.src_obj_size | |
238 elif whence == os.SEEK_SET: | |
239 with self.lock: | |
240 if offset == self.position: | |
241 pass | |
242 elif offset == self.last_position: | |
243 self.position = self.last_position | |
244 if self.last_data: | |
245 # If we seek to end and then back, we won't have last_data; we'll | |
246 # get it on the next call to read. | |
247 self.buffer.appendleft(self.last_data) | |
248 self.bytes_buffered += len(self.last_data) | |
249 else: | |
250 # Once a download is complete, boto seeks to 0 and re-reads to | |
251 # compute the hash if an md5 isn't already present (for example a GCS | |
252 # composite object), so we have to re-download the whole object. | |
253 # Also, when daisy-chaining to a resumable upload, on error the | |
254 # service may have received any number of the bytes; the download | |
255 # needs to be restarted from that point. | |
256 restart_download = True | |
257 | |
258 if restart_download: | |
259 self.stop_download.set() | |
260 | |
261 # Consume any remaining bytes in the download thread so that | |
262 # the thread can exit, then restart the thread at the desired position. | |
263 while self.download_thread.is_alive(): | |
264 with self.lock: | |
265 while self.bytes_buffered: | |
266 self.bytes_buffered -= len(self.buffer.popleft()) | |
267 time.sleep(0) | |
268 | |
269 with self.lock: | |
270 self.position = offset | |
271 self.buffer = deque() | |
272 self.bytes_buffered = 0 | |
273 self.last_position = 0 | |
274 self.last_data = None | |
275 self.StartDownloadThread(start_byte=offset, | |
276 progress_callback=self.progress_callback) | |
277 else: | |
278 raise IOError('Daisy-chain download wrapper does not support ' | |
279 'seek mode %s' % whence) | |
280 | |
281 def seekable(self): # pylint: disable=invalid-name | |
282 return True | |
OLD | NEW |