Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(278)

Side by Side Diff: gslib/daisy_chain_wrapper.py

Issue 698893003: Update checked in version of gsutil to version 4.6 (Closed) Base URL: http://dart.googlecode.com/svn/third_party/gsutil/
Patch Set: Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « gslib/cs_api_map.py ('k') | gslib/exception.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Property Changes:
Added: svn:eol-style
+ LF
OLDNEW
(Empty)
1 # -*- coding: utf-8 -*-
2 # Copyright 2014 Google Inc. All Rights Reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Wrapper for use in daisy-chained copies."""
16
17 from collections import deque
18 import os
19 import threading
20 import time
21
22 from gslib.cloud_api import BadRequestException
23 from gslib.cloud_api import CloudApi
24 from gslib.util import CreateLock
25 from gslib.util import TRANSFER_BUFFER_SIZE
26
27
28 # This controls the amount of bytes downloaded per download request.
29 # We do not buffer this many bytes in memory at a time - that is controlled by
30 # DaisyChainWrapper.max_buffer_size. This is the upper bound of bytes that may
31 # be unnecessarily downloaded if there is a break in the resumable upload.
32 DAISY_CHAIN_CHUNK_SIZE = 1024*1024*100
33
34
35 class BufferWrapper(object):
36 """Wraps the download file pointer to use our in-memory buffer."""
37
38 def __init__(self, daisy_chain_wrapper):
39 """Provides a buffered write interface for a file download.
40
41 Args:
42 daisy_chain_wrapper: DaisyChainWrapper instance to use for buffer and
43 locking.
44 """
45 self.daisy_chain_wrapper = daisy_chain_wrapper
46
47 def write(self, data): # pylint: disable=invalid-name
48 """Waits for space in the buffer, then writes data to the buffer."""
49 while True:
50 with self.daisy_chain_wrapper.lock:
51 if (self.daisy_chain_wrapper.bytes_buffered <
52 self.daisy_chain_wrapper.max_buffer_size):
53 break
54 # Buffer was full, yield thread priority so the upload can pull from it.
55 time.sleep(0)
56 data_len = len(data)
57 with self.daisy_chain_wrapper.lock:
58 self.daisy_chain_wrapper.buffer.append(data)
59 self.daisy_chain_wrapper.bytes_buffered += data_len
60
61
62 class DaisyChainWrapper(object):
63 """Wrapper class for daisy-chaining a cloud download to an upload.
64
65 This class instantiates a BufferWrapper object to buffer the download into
66 memory, consuming a maximum of max_buffer_size. It implements intelligent
67 behavior around read and seek that allow for all of the operations necessary
68 to copy a file.
69
70 This class is coupled with the XML and JSON implementations in that it
71 expects that small buffers (maximum of TRANSFER_BUFFER_SIZE) in size will be
72 used.
73 """
74
75 def __init__(self, src_url, src_obj_size, gsutil_api):
76 """Initializes the daisy chain wrapper.
77
78 Args:
79 src_url: Source CloudUrl to copy from.
80 src_obj_size: Size of source object.
81 gsutil_api: gsutil Cloud API to use for the copy.
82 """
83 # Current read position for the upload file pointer.
84 self.position = 0
85 self.buffer = deque()
86
87 self.bytes_buffered = 0
88 self.max_buffer_size = 1024 * 1024 # 1 MB
89
90 # We save one buffer's worth of data as a special case for boto,
91 # which seeks back one buffer and rereads to compute hashes. This is
92 # unnecessary because we can just compare cloud hash digests at the end,
93 # but it allows this to work without modfiying boto.
94 self.last_position = 0
95 self.last_data = None
96
97 # Protects buffer, position, bytes_buffered, last_position, and last_data.
98 self.lock = CreateLock()
99
100 self.src_obj_size = src_obj_size
101 self.src_url = src_url
102
103 # This is safe to use the upload and download thread because the download
104 # thread calls only GetObjectMedia, which creates a new HTTP connection
105 # independent of gsutil_api. Thus, it will not share an HTTP connection
106 # with the upload.
107 self.gsutil_api = gsutil_api
108
109 self.download_thread = None
110 self.stop_download = threading.Event()
111 self.StartDownloadThread()
112
113 def StartDownloadThread(self, start_byte=0):
114 """Starts the download thread for the source object (from start_byte)."""
115
116 def PerformDownload(start_byte):
117 """Downloads the source object in chunks.
118
119 This function checks the stop_download event and exits early if it is set.
120 It should be set when there is an error during the daisy-chain upload,
121 then this function can be called again with the upload's current position
122 as start_byte.
123
124 Args:
125 start_byte: Byte from which to begin the download.
126 """
127 # TODO: Support resumable downloads. This would require the BufferWrapper
128 # object to support seek() and tell() which requires coordination with
129 # the upload.
130 while start_byte + DAISY_CHAIN_CHUNK_SIZE < self.src_obj_size:
131 self.gsutil_api.GetObjectMedia(
132 self.src_url.bucket_name, self.src_url.object_name,
133 BufferWrapper(self), start_byte=start_byte,
134 end_byte=start_byte + DAISY_CHAIN_CHUNK_SIZE - 1,
135 generation=self.src_url.generation, object_size=self.src_obj_size,
136 download_strategy=CloudApi.DownloadStrategy.ONE_SHOT,
137 provider=self.src_url.scheme)
138 if self.stop_download.is_set():
139 # Download thread needs to be restarted, so exit.
140 self.stop_download.clear()
141 return
142 start_byte += DAISY_CHAIN_CHUNK_SIZE
143 self.gsutil_api.GetObjectMedia(
144 self.src_url.bucket_name, self.src_url.object_name,
145 BufferWrapper(self), start_byte=start_byte,
146 generation=self.src_url.generation, object_size=self.src_obj_size,
147 download_strategy=CloudApi.DownloadStrategy.ONE_SHOT,
148 provider=self.src_url.scheme)
149
150 # TODO: If we do gzip encoding transforms mid-transfer, this will fail.
151 self.download_thread = threading.Thread(target=PerformDownload,
152 args=(start_byte,))
153 self.download_thread.start()
154
155 def read(self, amt=None): # pylint: disable=invalid-name
156 """Exposes a stream from the in-memory buffer to the upload."""
157 if self.position == self.src_obj_size or amt == 0:
158 # If there is no data left or 0 bytes were requested, return an empty
159 # string so callers can call still call len() and read(0).
160 return ''
161 if amt is None or amt > TRANSFER_BUFFER_SIZE:
162 raise BadRequestException(
163 'Invalid HTTP read size %s during daisy chain operation, '
164 'expected <= %s.' % (amt, TRANSFER_BUFFER_SIZE))
165 while True:
166 with self.lock:
167 if self.buffer:
168 break
169 # Buffer was empty, yield thread priority so the download thread can fill.
170 time.sleep(0)
171 with self.lock:
172 data = self.buffer.popleft()
173 self.last_position = self.position
174 self.last_data = data
175 data_len = len(data)
176 self.position += data_len
177 self.bytes_buffered -= data_len
178 if data_len > amt:
179 raise BadRequestException(
180 'Invalid read during daisy chain operation, got data of size '
181 '%s, expected size %s.' % (data_len, amt))
182 return data
183
184 def tell(self): # pylint: disable=invalid-name
185 with self.lock:
186 return self.position
187
188 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name
189 restart_download = False
190 if whence == os.SEEK_END:
191 if offset:
192 raise BadRequestException(
193 'Invalid seek during daisy chain operation. Non-zero offset %s '
194 'from os.SEEK_END is not supported' % offset)
195 with self.lock:
196 self.last_position = self.position
197 self.last_data = None
198 # Safe because we check position against src_obj_size in read.
199 self.position = self.src_obj_size
200 elif whence == os.SEEK_SET:
201 with self.lock:
202 if offset == self.position:
203 pass
204 elif offset == self.last_position:
205 self.position = self.last_position
206 if self.last_data:
207 # If we seek to end and then back, we won't have last_data; we'll
208 # get it on the next call to read.
209 self.buffer.appendleft(self.last_data)
210 self.bytes_buffered += len(self.last_data)
211 else:
212 # Once a download is complete, boto seeks to 0 and re-reads to
213 # compute the hash if an md5 isn't already present (for example a GCS
214 # composite object), so we have to re-download the whole object.
215 # Also, when daisy-chaining to a resumable upload, on error the
216 # service may have received any number of the bytes; the download
217 # needs to be restarted from that point.
218 restart_download = True
219
220 if restart_download:
221 self.stop_download.set()
222
223 # Consume any remaining bytes in the download thread so that
224 # the thread can exit, then restart the thread at the desired position.
225 while self.download_thread.is_alive():
226 with self.lock:
227 while self.bytes_buffered:
228 self.bytes_buffered -= len(self.buffer.popleft())
229 time.sleep(0)
230
231 with self.lock:
232 self.position = offset
233 self.buffer = deque()
234 self.bytes_buffered = 0
235 self.last_position = 0
236 self.last_data = None
237 self.StartDownloadThread(start_byte=offset)
238 else:
239 raise IOError('Daisy-chain download wrapper does not support '
240 'seek mode %s' % whence)
241
242 def seekable(self): # pylint: disable=invalid-name
243 return True
OLDNEW
« no previous file with comments | « gslib/cs_api_map.py ('k') | gslib/exception.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698