Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(29)

Side by Side Diff: gae/cloudstorage/storage_api.py

Issue 1150463002: [chrome-devtools-frontend] Migrate to cloudstorage client (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/chrome-devtools-frontend
Patch Set: Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « gae/cloudstorage/rest_api.py ('k') | gae/cloudstorage/test_utils.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2012 Google Inc. All Rights Reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing,
10 # software distributed under the License is distributed on an
11 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
12 # either express or implied. See the License for the specific
13 # language governing permissions and limitations under the License.
14
15 """Python wrappers for the Google Storage RESTful API."""
16
17
18
19
20
21 __all__ = ['ReadBuffer',
22 'StreamingBuffer',
23 ]
24
25 import collections
26 import os
27 import urlparse
28
29 from . import api_utils
30 from . import common
31 from . import errors
32 from . import rest_api
33
34 try:
35 from google.appengine.api import urlfetch
36 from google.appengine.ext import ndb
37 except ImportError:
38 from google.appengine.api import urlfetch
39 from google.appengine.ext import ndb
40
41
42
43 def _get_storage_api(retry_params, account_id=None):
44 """Returns storage_api instance for API methods.
45
46 Args:
47 retry_params: An instance of api_utils.RetryParams. If none,
48 thread's default will be used.
49 account_id: Internal-use only.
50
51 Returns:
52 A storage_api instance to handle urlfetch work to GCS.
53 On dev appserver, this instance by default will talk to a local stub
54 unless common.ACCESS_TOKEN is set. That token will be used to talk
55 to the real GCS.
56 """
57
58
59 api = _StorageApi(_StorageApi.full_control_scope,
60 service_account_id=account_id,
61 retry_params=retry_params)
62 if common.local_run() and not common.get_access_token():
63 api.api_url = common.local_api_url()
64 if common.get_access_token():
65 api.token = common.get_access_token()
66 return api
67
68
69 class _StorageApi(rest_api._RestApi):
70 """A simple wrapper for the Google Storage RESTful API.
71
72 WARNING: Do NOT directly use this api. It's an implementation detail
73 and is subject to change at any release.
74
75 All async methods have similar args and returns.
76
77 Args:
78 path: The path to the Google Storage object or bucket, e.g.
79 '/mybucket/myfile' or '/mybucket'.
80 **kwd: Options for urlfetch. e.g.
81 headers={'content-type': 'text/plain'}, payload='blah'.
82
83 Returns:
84 A ndb Future. When fulfilled, future.get_result() should return
85 a tuple of (status, headers, content) that represents a HTTP response
86 of Google Cloud Storage XML API.
87 """
88
89 api_url = 'https://storage.googleapis.com'
90 read_only_scope = 'https://www.googleapis.com/auth/devstorage.read_only'
91 read_write_scope = 'https://www.googleapis.com/auth/devstorage.read_write'
92 full_control_scope = 'https://www.googleapis.com/auth/devstorage.full_control'
93
94 def __getstate__(self):
95 """Store state as part of serialization/pickling.
96
97 Returns:
98 A tuple (of dictionaries) with the state of this object
99 """
100 return (super(_StorageApi, self).__getstate__(), {'api_url': self.api_url})
101
102 def __setstate__(self, state):
103 """Restore state as part of deserialization/unpickling.
104
105 Args:
106 state: the tuple from a __getstate__ call
107 """
108 superstate, localstate = state
109 super(_StorageApi, self).__setstate__(superstate)
110 self.api_url = localstate['api_url']
111
112 @api_utils._eager_tasklet
113 @ndb.tasklet
114 def do_request_async(self, url, method='GET', headers=None, payload=None,
115 deadline=None, callback=None):
116 """Inherit docs.
117
118 This method translates urlfetch exceptions to more service specific ones.
119 """
120 if headers is None:
121 headers = {}
122 if 'x-goog-api-version' not in headers:
123 headers['x-goog-api-version'] = '2'
124 headers['accept-encoding'] = 'gzip, *'
125 try:
126 resp_tuple = yield super(_StorageApi, self).do_request_async(
127 url, method=method, headers=headers, payload=payload,
128 deadline=deadline, callback=callback)
129 except urlfetch.DownloadError, e:
130 raise errors.TimeoutError(
131 'Request to Google Cloud Storage timed out.', e)
132
133 raise ndb.Return(resp_tuple)
134
135
136 def post_object_async(self, path, **kwds):
137 """POST to an object."""
138 return self.do_request_async(self.api_url + path, 'POST', **kwds)
139
140 def put_object_async(self, path, **kwds):
141 """PUT an object."""
142 return self.do_request_async(self.api_url + path, 'PUT', **kwds)
143
144 def get_object_async(self, path, **kwds):
145 """GET an object.
146
147 Note: No payload argument is supported.
148 """
149 return self.do_request_async(self.api_url + path, 'GET', **kwds)
150
151 def delete_object_async(self, path, **kwds):
152 """DELETE an object.
153
154 Note: No payload argument is supported.
155 """
156 return self.do_request_async(self.api_url + path, 'DELETE', **kwds)
157
158 def head_object_async(self, path, **kwds):
159 """HEAD an object.
160
161 Depending on request headers, HEAD returns various object properties,
162 e.g. Content-Length, Last-Modified, and ETag.
163
164 Note: No payload argument is supported.
165 """
166 return self.do_request_async(self.api_url + path, 'HEAD', **kwds)
167
168 def get_bucket_async(self, path, **kwds):
169 """GET a bucket."""
170 return self.do_request_async(self.api_url + path, 'GET', **kwds)
171
172 # pylint: disable=too-many-locals
173 def compose_object(self, file_list, destination_file, content_type):
174 """COMPOSE multiple objects together.
175
176 Using the given list of files calls the put object with the compose flag.
177 This call merges all the files into the destination file.
178
179 Args:
180 file_list: list of dicts with the file name.
181 destination_file: Path to the destination file.
182 content_type: Content type for the destination file.
183 """
184
185 xml_setting_list = ['<ComposeRequest>']
186
187 for meta_data in file_list:
188 xml_setting_list.append('<Component>')
189 for key, val in meta_data.iteritems():
190 xml_setting_list.append('<%s>%s</%s>' % (key, val, key))
191 xml_setting_list.append('</Component>')
192 xml_setting_list.append('</ComposeRequest>')
193 xml = ''.join(xml_setting_list)
194
195 if content_type is not None:
196 headers = {'Content-Type': content_type}
197 else:
198 headers = None
199 # pylint: disable=no-member
200 status, resp_headers, content = self.put_object(
201 api_utils._quote_filename(destination_file) + '?compose',
202 payload=xml,
203 headers=headers)
204 errors.check_status(status, [200], destination_file, resp_headers, body=cont ent)
205
206
207 _StorageApi = rest_api.add_sync_methods(_StorageApi)
208
209
210 class ReadBuffer(object):
211 """A class for reading Google storage files."""
212
213 DEFAULT_BUFFER_SIZE = 1024 * 1024
214 MAX_REQUEST_SIZE = 30 * DEFAULT_BUFFER_SIZE
215
216 def __init__(self,
217 api,
218 path,
219 offset=0,
220 buffer_size=DEFAULT_BUFFER_SIZE,
221 max_request_size=MAX_REQUEST_SIZE):
222 """Constructor.
223
224 Args:
225 api: A StorageApi instance.
226 path: Quoted/escaped path to the object, e.g. /mybucket/myfile
227 buffer_size: buffer size. The ReadBuffer keeps
228 one buffer. But there may be a pending future that contains
229 a second buffer. This size must be less than max_request_size.
230 max_request_size: Max bytes to request in one urlfetch.
231 offset: Number of bytes to skip at the start of the file. If None, 0 is
232 used.
233 """
234 self._api = api
235 self._path = path
236 self.name = api_utils._unquote_filename(path)
237 self.closed = False
238
239 assert buffer_size <= max_request_size
240 self._buffer_size = buffer_size
241 self._max_request_size = max_request_size
242 self._offset = offset
243 self._buffer = _Buffer()
244 self._etag = None
245
246 get_future = self._get_segment(offset, self._buffer_size, check_response=Fal se)
247
248 status, headers, content = self._api.head_object(path)
249 errors.check_status(status, [200], path, resp_headers=headers, body=content)
250 self._file_size = long(common.get_stored_content_length(headers))
251 self._check_etag(headers.get('etag'))
252
253 self._buffer_future = None
254
255 if self._file_size != 0:
256 content, check_response_closure = get_future.get_result()
257 check_response_closure()
258 self._buffer.reset(content)
259 self._request_next_buffer()
260
261 def __getstate__(self):
262 """Store state as part of serialization/pickling.
263
264 The contents of the read buffer are not stored, only the current offset for
265 data read by the client. A new read buffer is established at unpickling.
266 The head information for the object (file size and etag) are stored to
267 reduce startup and ensure the file has not changed.
268
269 Returns:
270 A dictionary with the state of this object
271 """
272 return {'api': self._api,
273 'path': self._path,
274 'buffer_size': self._buffer_size,
275 'request_size': self._max_request_size,
276 'etag': self._etag,
277 'size': self._file_size,
278 'offset': self._offset,
279 'closed': self.closed}
280
281 def __setstate__(self, state):
282 """Restore state as part of deserialization/unpickling.
283
284 Args:
285 state: the dictionary from a __getstate__ call
286
287 Along with restoring the state, pre-fetch the next read buffer.
288 """
289 self._api = state['api']
290 self._path = state['path']
291 self.name = api_utils._unquote_filename(self._path)
292 self._buffer_size = state['buffer_size']
293 self._max_request_size = state['request_size']
294 self._etag = state['etag']
295 self._file_size = state['size']
296 self._offset = state['offset']
297 self._buffer = _Buffer()
298 self.closed = state['closed']
299 self._buffer_future = None
300 if self._remaining() and not self.closed:
301 self._request_next_buffer()
302
303 def __iter__(self):
304 """Iterator interface.
305
306 Note the ReadBuffer container itself is the iterator. It's
307 (quote PEP0234)
308 'destructive: they consumes all the values and a second iterator
309 cannot easily be created that iterates independently over the same values.
310 You could open the file for the second time, or seek() to the beginning.'
311
312 Returns:
313 Self.
314 """
315 return self
316
317 def next(self):
318 line = self.readline()
319 if not line:
320 raise StopIteration()
321 return line
322
323 def readline(self, size=-1):
324 """Read one line delimited by '\n' from the file.
325
326 A trailing newline character is kept in the string. It may be absent when a
327 file ends with an incomplete line. If the size argument is non-negative,
328 it specifies the maximum string size (counting the newline) to return.
329 A negative size is the same as unspecified. Empty string is returned
330 only when EOF is encountered immediately.
331
332 Args:
333 size: Maximum number of bytes to read. If not specified, readline stops
334 only on '\n' or EOF.
335
336 Returns:
337 The data read as a string.
338
339 Raises:
340 IOError: When this buffer is closed.
341 """
342 self._check_open()
343 if size == 0 or not self._remaining():
344 return ''
345
346 data_list = []
347 newline_offset = self._buffer.find_newline(size)
348 while newline_offset < 0:
349 data = self._buffer.read(size)
350 size -= len(data)
351 self._offset += len(data)
352 data_list.append(data)
353 if size == 0 or not self._remaining():
354 return ''.join(data_list)
355 self._buffer.reset(self._buffer_future.get_result())
356 self._request_next_buffer()
357 newline_offset = self._buffer.find_newline(size)
358
359 data = self._buffer.read_to_offset(newline_offset + 1)
360 self._offset += len(data)
361 data_list.append(data)
362
363 return ''.join(data_list)
364
365 def read(self, size=-1):
366 """Read data from RAW file.
367
368 Args:
369 size: Number of bytes to read as integer. Actual number of bytes
370 read is always equal to size unless EOF is reached. If size is
371 negative or unspecified, read the entire file.
372
373 Returns:
374 data read as str.
375
376 Raises:
377 IOError: When this buffer is closed.
378 """
379 self._check_open()
380 if not self._remaining():
381 return ''
382
383 data_list = []
384 while True:
385 remaining = self._buffer.remaining()
386 if size >= 0 and size < remaining:
387 data_list.append(self._buffer.read(size))
388 self._offset += size
389 break
390 else:
391 size -= remaining
392 self._offset += remaining
393 data_list.append(self._buffer.read())
394
395 if self._buffer_future is None:
396 if size < 0 or size >= self._remaining():
397 needs = self._remaining()
398 else:
399 needs = size
400 data_list.extend(self._get_segments(self._offset, needs))
401 self._offset += needs
402 break
403
404 if self._buffer_future:
405 self._buffer.reset(self._buffer_future.get_result())
406 self._buffer_future = None
407
408 if self._buffer_future is None:
409 self._request_next_buffer()
410 return ''.join(data_list)
411
412 def _remaining(self):
413 return self._file_size - self._offset
414
415 def _request_next_buffer(self):
416 """Request next buffer.
417
418 Requires self._offset and self._buffer are in consistent state.
419 """
420 self._buffer_future = None
421 next_offset = self._offset + self._buffer.remaining()
422 if next_offset != self._file_size:
423 self._buffer_future = self._get_segment(next_offset,
424 self._buffer_size)
425
426 def _get_segments(self, start, request_size):
427 """Get segments of the file from Google Storage as a list.
428
429 A large request is broken into segments to avoid hitting urlfetch
430 response size limit. Each segment is returned from a separate urlfetch.
431
432 Args:
433 start: start offset to request. Inclusive. Have to be within the
434 range of the file.
435 request_size: number of bytes to request.
436
437 Returns:
438 A list of file segments in order
439 """
440 if not request_size:
441 return []
442
443 end = start + request_size
444 futures = []
445
446 while request_size > self._max_request_size:
447 futures.append(self._get_segment(start, self._max_request_size))
448 request_size -= self._max_request_size
449 start += self._max_request_size
450 if start < end:
451 futures.append(self._get_segment(start, end - start))
452 return [fut.get_result() for fut in futures]
453
454 @ndb.tasklet
455 def _get_segment(self, start, request_size, check_response=True):
456 """Get a segment of the file from Google Storage.
457
458 Args:
459 start: start offset of the segment. Inclusive. Have to be within the
460 range of the file.
461 request_size: number of bytes to request. Have to be small enough
462 for a single urlfetch request. May go over the logical range of the
463 file.
464 check_response: True to check the validity of GCS response automatically
465 before the future returns. False otherwise. See Yields section.
466
467 Yields:
468 If check_response is True, the segment [start, start + request_size)
469 of the file.
470 Otherwise, a tuple. The first element is the unverified file segment.
471 The second element is a closure that checks response. Caller should
472 first invoke the closure before consuing the file segment.
473
474 Raises:
475 ValueError: if the file has changed while reading.
476 """
477 end = start + request_size - 1
478 content_range = '%d-%d' % (start, end)
479 headers = {'Range': 'bytes=' + content_range}
480 status, resp_headers, content = yield self._api.get_object_async(
481 self._path, headers=headers)
482 def _checker():
483 errors.check_status(status, [200, 206], self._path, headers,
484 resp_headers, body=content)
485 self._check_etag(resp_headers.get('etag'))
486 if check_response:
487 _checker()
488 raise ndb.Return(content)
489 raise ndb.Return(content, _checker)
490
491 def _check_etag(self, etag):
492 """Check if etag is the same across requests to GCS.
493
494 If self._etag is None, set it. If etag is set, check that the new
495 etag equals the old one.
496
497 In the __init__ method, we fire one HEAD and one GET request using
498 ndb tasklet. One of them would return first and set the first value.
499
500 Args:
501 etag: etag from a GCS HTTP response. None if etag is not part of the
502 response header. It could be None for example in the case of GCS
503 composite file.
504
505 Raises:
506 ValueError: if two etags are not equal.
507 """
508 if etag is None:
509 return
510 elif self._etag is None:
511 self._etag = etag
512 elif self._etag != etag:
513 raise ValueError('File on GCS has changed while reading.')
514
515 def close(self):
516 self.closed = True
517 self._buffer = None
518 self._buffer_future = None
519
520 def __enter__(self):
521 return self
522
523 def __exit__(self, atype, value, traceback):
524 self.close()
525 return False
526
527 def seek(self, offset, whence=os.SEEK_SET):
528 """Set the file's current offset.
529
530 Note if the new offset is out of bound, it is adjusted to either 0 or EOF.
531
532 Args:
533 offset: seek offset as number.
534 whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
535 os.SEEK_CUR (seek relative to the current position), and os.SEEK_END
536 (seek relative to the end, offset should be negative).
537
538 Raises:
539 IOError: When this buffer is closed.
540 ValueError: When whence is invalid.
541 """
542 self._check_open()
543
544 self._buffer.reset()
545 self._buffer_future = None
546
547 if whence == os.SEEK_SET:
548 self._offset = offset
549 elif whence == os.SEEK_CUR:
550 self._offset += offset
551 elif whence == os.SEEK_END:
552 self._offset = self._file_size + offset
553 else:
554 raise ValueError('Whence mode %s is invalid.' % str(whence))
555
556 self._offset = min(self._offset, self._file_size)
557 self._offset = max(self._offset, 0)
558 if self._remaining():
559 self._request_next_buffer()
560
561 def tell(self):
562 """Tell the file's current offset.
563
564 Returns:
565 current offset in reading this file.
566
567 Raises:
568 IOError: When this buffer is closed.
569 """
570 self._check_open()
571 return self._offset
572
573 def _check_open(self):
574 if self.closed:
575 raise IOError('Buffer is closed.')
576
577 def seekable(self):
578 return True
579
580 def readable(self):
581 return True
582
583 def writable(self):
584 return False
585
586
587 class _Buffer(object):
588 """In memory buffer."""
589
590 def __init__(self):
591 self.reset()
592
593 def reset(self, content='', offset=0):
594 self._buffer = content
595 self._offset = offset
596
597 def read(self, size=-1):
598 """Returns bytes from self._buffer and update related offsets.
599
600 Args:
601 size: number of bytes to read starting from current offset.
602 Read the entire buffer if negative.
603
604 Returns:
605 Requested bytes from buffer.
606 """
607 if size < 0:
608 offset = len(self._buffer)
609 else:
610 offset = self._offset + size
611 return self.read_to_offset(offset)
612
613 def read_to_offset(self, offset):
614 """Returns bytes from self._buffer and update related offsets.
615
616 Args:
617 offset: read from current offset to this offset, exclusive.
618
619 Returns:
620 Requested bytes from buffer.
621 """
622 assert offset >= self._offset
623 result = self._buffer[self._offset: offset]
624 self._offset += len(result)
625 return result
626
627 def remaining(self):
628 return len(self._buffer) - self._offset
629
630 def find_newline(self, size=-1):
631 """Search for newline char in buffer starting from current offset.
632
633 Args:
634 size: number of bytes to search. -1 means all.
635
636 Returns:
637 offset of newline char in buffer. -1 if doesn't exist.
638 """
639 if size < 0:
640 return self._buffer.find('\n', self._offset)
641 return self._buffer.find('\n', self._offset, self._offset + size)
642
643
644 class StreamingBuffer(object):
645 """A class for creating large objects using the 'resumable' API.
646
647 The API is a subset of the Python writable stream API sufficient to
648 support writing zip files using the zipfile module.
649
650 The exact sequence of calls and use of headers is documented at
651 https://developers.google.com/storage/docs/developer-guide#unknownresumables
652 """
653
654 _blocksize = 256 * 1024
655
656 _flushsize = 8 * _blocksize
657
658 _maxrequestsize = 9 * 4 * _blocksize
659
660 def __init__(self,
661 api,
662 path,
663 content_type=None,
664 gcs_headers=None):
665 """Constructor.
666
667 Args:
668 api: A StorageApi instance.
669 path: Quoted/escaped path to the object, e.g. /mybucket/myfile
670 content_type: Optional content-type; Default value is
671 delegate to Google Cloud Storage.
672 gcs_headers: additional gs headers as a str->str dict, e.g
673 {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}.
674 Raises:
675 IOError: When this location can not be found.
676 """
677 assert self._maxrequestsize > self._blocksize
678 assert self._maxrequestsize % self._blocksize == 0
679 assert self._maxrequestsize >= self._flushsize
680
681 self._api = api
682 self._path = path
683
684 self.name = api_utils._unquote_filename(path)
685 self.closed = False
686
687 self._buffer = collections.deque()
688 self._buffered = 0
689 self._written = 0
690 self._offset = 0
691
692 headers = {'x-goog-resumable': 'start'}
693 if content_type:
694 headers['content-type'] = content_type
695 if gcs_headers:
696 headers.update(gcs_headers)
697 status, resp_headers, content = self._api.post_object(path, headers=headers)
698 errors.check_status(status, [201], path, headers, resp_headers,
699 body=content)
700 loc = resp_headers.get('location')
701 if not loc:
702 raise IOError('No location header found in 201 response')
703 parsed = urlparse.urlparse(loc)
704 self._path_with_token = '%s?%s' % (self._path, parsed.query)
705
706 def __getstate__(self):
707 """Store state as part of serialization/pickling.
708
709 The contents of the write buffer are stored. Writes to the underlying
710 storage are required to be on block boundaries (_blocksize) except for the
711 last write. In the worst case the pickled version of this object may be
712 slightly larger than the blocksize.
713
714 Returns:
715 A dictionary with the state of this object
716
717 """
718 return {'api': self._api,
719 'path': self._path,
720 'path_token': self._path_with_token,
721 'buffer': self._buffer,
722 'buffered': self._buffered,
723 'written': self._written,
724 'offset': self._offset,
725 'closed': self.closed}
726
727 def __setstate__(self, state):
728 """Restore state as part of deserialization/unpickling.
729
730 Args:
731 state: the dictionary from a __getstate__ call
732 """
733 self._api = state['api']
734 self._path_with_token = state['path_token']
735 self._buffer = state['buffer']
736 self._buffered = state['buffered']
737 self._written = state['written']
738 self._offset = state['offset']
739 self.closed = state['closed']
740 self._path = state['path']
741 self.name = api_utils._unquote_filename(self._path)
742
743 def write(self, data):
744 """Write some bytes.
745
746 Args:
747 data: data to write. str.
748
749 Raises:
750 TypeError: if data is not of type str.
751 """
752 self._check_open()
753 if not isinstance(data, str):
754 raise TypeError('Expected str but got %s.' % type(data))
755 if not data:
756 return
757 self._buffer.append(data)
758 self._buffered += len(data)
759 self._offset += len(data)
760 if self._buffered >= self._flushsize:
761 self._flush()
762
763 def flush(self):
764 """Flush as much as possible to GCS.
765
766 GCS *requires* that all writes except for the final one align on
767 256KB boundaries. So the internal buffer may still have < 256KB bytes left
768 after flush.
769 """
770 self._check_open()
771 self._flush(finish=False)
772
773 def tell(self):
774 """Return the total number of bytes passed to write() so far.
775
776 (There is no seek() method.)
777 """
778 return self._offset
779
780 def close(self):
781 """Flush the buffer and finalize the file.
782
783 When this returns the new file is available for reading.
784 """
785 if not self.closed:
786 self.closed = True
787 self._flush(finish=True)
788 self._buffer = None
789
790 def __enter__(self):
791 return self
792
793 def __exit__(self, atype, value, traceback):
794 self.close()
795 return False
796
797 def _flush(self, finish=False):
798 """Internal API to flush.
799
800 Buffer is flushed to GCS only when the total amount of buffered data is at
801 least self._blocksize, or to flush the final (incomplete) block of
802 the file with finish=True.
803 """
804 while ((finish and self._buffered >= 0) or
805 (not finish and self._buffered >= self._blocksize)):
806 tmp_buffer = []
807 tmp_buffer_len = 0
808
809 excess = 0
810 while self._buffer:
811 buf = self._buffer.popleft()
812 size = len(buf)
813 self._buffered -= size
814 tmp_buffer.append(buf)
815 tmp_buffer_len += size
816 if tmp_buffer_len >= self._maxrequestsize:
817 excess = tmp_buffer_len - self._maxrequestsize
818 break
819 if not finish and (
820 tmp_buffer_len % self._blocksize + self._buffered <
821 self._blocksize):
822 excess = tmp_buffer_len % self._blocksize
823 break
824
825 if excess:
826 over = tmp_buffer.pop()
827 size = len(over)
828 assert size >= excess
829 tmp_buffer_len -= size
830 head, tail = over[:-excess], over[-excess:]
831 self._buffer.appendleft(tail)
832 self._buffered += len(tail)
833 if head:
834 tmp_buffer.append(head)
835 tmp_buffer_len += len(head)
836
837 data = ''.join(tmp_buffer)
838 file_len = '*'
839 if finish and not self._buffered:
840 file_len = self._written + len(data)
841 self._send_data(data, self._written, file_len)
842 self._written += len(data)
843 if file_len != '*':
844 break
845
846 def _send_data(self, data, start_offset, file_len):
847 """Send the block to the storage service.
848
849 This is a utility method that does not modify self.
850
851 Args:
852 data: data to send in str.
853 start_offset: start offset of the data in relation to the file.
854 file_len: an int if this is the last data to append to the file.
855 Otherwise '*'.
856 """
857 headers = {}
858 end_offset = start_offset + len(data) - 1
859
860 if data:
861 headers['content-range'] = ('bytes %d-%d/%s' %
862 (start_offset, end_offset, file_len))
863 else:
864 headers['content-range'] = ('bytes */%s' % file_len)
865
866 status, response_headers, content = self._api.put_object(
867 self._path_with_token, payload=data, headers=headers)
868 if file_len == '*':
869 expected = 308
870 else:
871 expected = 200
872 errors.check_status(status, [expected], self._path, headers,
873 response_headers, content,
874 {'upload_path': self._path_with_token})
875
876 def _get_offset_from_gcs(self):
877 """Get the last offset that has been written to GCS.
878
879 This is a utility method that does not modify self.
880
881 Returns:
882 an int of the last offset written to GCS by this upload, inclusive.
883 -1 means nothing has been written.
884 """
885 headers = {'content-range': 'bytes */*'}
886 status, response_headers, content = self._api.put_object(
887 self._path_with_token, headers=headers)
888 errors.check_status(status, [308], self._path, headers,
889 response_headers, content,
890 {'upload_path': self._path_with_token})
891 val = response_headers.get('range')
892 if val is None:
893 return -1
894 _, offset = val.rsplit('-', 1)
895 return int(offset)
896
897 def _force_close(self, file_length=None):
898 """Close this buffer on file_length.
899
900 Finalize this upload immediately on file_length.
901 Contents that are still in memory will not be uploaded.
902
903 This is a utility method that does not modify self.
904
905 Args:
906 file_length: file length. Must match what has been uploaded. If None,
907 it will be queried from GCS.
908 """
909 if file_length is None:
910 file_length = self._get_offset_from_gcs() + 1
911 self._send_data('', 0, file_length)
912
913 def _check_open(self):
914 if self.closed:
915 raise IOError('Buffer is closed.')
916
917 def seekable(self):
918 return False
919
920 def readable(self):
921 return False
922
923 def writable(self):
924 return True
OLDNEW
« no previous file with comments | « gae/cloudstorage/rest_api.py ('k') | gae/cloudstorage/test_utils.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698