Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(50)

Side by Side Diff: third_party/google_appengine_cloudstorage/cloudstorage/storage_api.py

Issue 139303023: add GCS support to docs server (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: bumped versions Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2012 Google Inc. All Rights Reserved.
2
3 """Python wrappers for the Google Storage RESTful API."""
4
5
6
7
8
9 __all__ = ['ReadBuffer',
10 'StreamingBuffer',
11 ]
12
13 import collections
14 import logging
15 import os
16 import urlparse
17
18 from . import api_utils
19 from . import errors
20 from . import rest_api
21
22 try:
23 from google.appengine.api import urlfetch
24 from google.appengine.ext import ndb
25 except ImportError:
26 from google.appengine.api import urlfetch
27 from google.appengine.ext import ndb
28
29
30
31 class _StorageApi(rest_api._RestApi):
32 """A simple wrapper for the Google Storage RESTful API.
33
34 WARNING: Do NOT directly use this api. It's an implementation detail
35 and is subject to change at any release.
36
37 All async methods have similar args and returns.
38
39 Args:
40 path: The path to the Google Storage object or bucket, e.g.
41 '/mybucket/myfile' or '/mybucket'.
42 **kwd: Options for urlfetch. e.g.
43 headers={'content-type': 'text/plain'}, payload='blah'.
44
45 Returns:
46 A ndb Future. When fulfilled, future.get_result() should return
47 a tuple of (status, headers, content) that represents a HTTP response
48 of Google Cloud Storage XML API.
49 """
50
51 api_url = 'https://storage.googleapis.com'
52 read_only_scope = 'https://www.googleapis.com/auth/devstorage.read_only'
53 read_write_scope = 'https://www.googleapis.com/auth/devstorage.read_write'
54 full_control_scope = 'https://www.googleapis.com/auth/devstorage.full_control'
55
56 def __getstate__(self):
57 """Store state as part of serialization/pickling.
58
59 Returns:
60 A tuple (of dictionaries) with the state of this object
61 """
62 return (super(_StorageApi, self).__getstate__(), {'api_url': self.api_url})
63
64 def __setstate__(self, state):
65 """Restore state as part of deserialization/unpickling.
66
67 Args:
68 state: the tuple from a __getstate__ call
69 """
70 superstate, localstate = state
71 super(_StorageApi, self).__setstate__(superstate)
72 self.api_url = localstate['api_url']
73
74 @api_utils._eager_tasklet
75 @ndb.tasklet
76 def do_request_async(self, url, method='GET', headers=None, payload=None,
77 deadline=None, callback=None):
78 """Inherit docs.
79
80 This method translates urlfetch exceptions to more service specific ones.
81 """
82 if headers is None:
83 headers = {}
84 if 'x-goog-api-version' not in headers:
85 headers['x-goog-api-version'] = '2'
86 headers['accept-encoding'] = 'gzip, *'
87 try:
88 resp_tuple = yield super(_StorageApi, self).do_request_async(
89 url, method=method, headers=headers, payload=payload,
90 deadline=deadline, callback=callback)
91 except urlfetch.DownloadError, e:
92 raise errors.TimeoutError(
93 'Request to Google Cloud Storage timed out.', e)
94
95 raise ndb.Return(resp_tuple)
96
97
98 def post_object_async(self, path, **kwds):
99 """POST to an object."""
100 return self.do_request_async(self.api_url + path, 'POST', **kwds)
101
102 def put_object_async(self, path, **kwds):
103 """PUT an object."""
104 return self.do_request_async(self.api_url + path, 'PUT', **kwds)
105
106 def get_object_async(self, path, **kwds):
107 """GET an object.
108
109 Note: No payload argument is supported.
110 """
111 return self.do_request_async(self.api_url + path, 'GET', **kwds)
112
113 def delete_object_async(self, path, **kwds):
114 """DELETE an object.
115
116 Note: No payload argument is supported.
117 """
118 return self.do_request_async(self.api_url + path, 'DELETE', **kwds)
119
120 def head_object_async(self, path, **kwds):
121 """HEAD an object.
122
123 Depending on request headers, HEAD returns various object properties,
124 e.g. Content-Length, Last-Modified, and ETag.
125
126 Note: No payload argument is supported.
127 """
128 return self.do_request_async(self.api_url + path, 'HEAD', **kwds)
129
130 def get_bucket_async(self, path, **kwds):
131 """GET a bucket."""
132 return self.do_request_async(self.api_url + path, 'GET', **kwds)
133
134
135 _StorageApi = rest_api.add_sync_methods(_StorageApi)
136
137
138 class ReadBuffer(object):
139 """A class for reading Google storage files."""
140
141 DEFAULT_BUFFER_SIZE = 1024 * 1024
142 MAX_REQUEST_SIZE = 30 * DEFAULT_BUFFER_SIZE
143
144 def __init__(self,
145 api,
146 path,
147 buffer_size=DEFAULT_BUFFER_SIZE,
148 max_request_size=MAX_REQUEST_SIZE):
149 """Constructor.
150
151 Args:
152 api: A StorageApi instance.
153 path: Path to the object, e.g. '/mybucket/myfile'.
154 buffer_size: buffer size. The ReadBuffer keeps
155 one buffer. But there may be a pending future that contains
156 a second buffer. This size must be less than max_request_size.
157 max_request_size: Max bytes to request in one urlfetch.
158 """
159 self._api = api
160 self.name = path
161 self.closed = False
162
163 assert buffer_size <= max_request_size
164 self._buffer_size = buffer_size
165 self._max_request_size = max_request_size
166 self._offset = 0
167 self._buffer = _Buffer()
168 self._etag = None
169
170 self._request_next_buffer()
171
172 status, headers, _ = self._api.head_object(path)
173 errors.check_status(status, [200], path, resp_headers=headers)
174 self._file_size = long(headers['content-length'])
175 self._check_etag(headers.get('etag'))
176 if self._file_size == 0:
177 self._buffer_future = None
178
179 def __getstate__(self):
180 """Store state as part of serialization/pickling.
181
182 The contents of the read buffer are not stored, only the current offset for
183 data read by the client. A new read buffer is established at unpickling.
184 The head information for the object (file size and etag) are stored to
185 reduce startup and ensure the file has not changed.
186
187 Returns:
188 A dictionary with the state of this object
189 """
190 return {'api': self._api,
191 'name': self.name,
192 'buffer_size': self._buffer_size,
193 'request_size': self._max_request_size,
194 'etag': self._etag,
195 'size': self._file_size,
196 'offset': self._offset,
197 'closed': self.closed}
198
199 def __setstate__(self, state):
200 """Restore state as part of deserialization/unpickling.
201
202 Args:
203 state: the dictionary from a __getstate__ call
204
205 Along with restoring the state, pre-fetch the next read buffer.
206 """
207 self._api = state['api']
208 self.name = state['name']
209 self._buffer_size = state['buffer_size']
210 self._max_request_size = state['request_size']
211 self._etag = state['etag']
212 self._file_size = state['size']
213 self._offset = state['offset']
214 self._buffer = _Buffer()
215 self.closed = state['closed']
216 self._buffer_future = None
217 if self._remaining() and not self.closed:
218 self._request_next_buffer()
219
220 def __iter__(self):
221 """Iterator interface.
222
223 Note the ReadBuffer container itself is the iterator. It's
224 (quote PEP0234)
225 'destructive: they consumes all the values and a second iterator
226 cannot easily be created that iterates independently over the same values.
227 You could open the file for the second time, or seek() to the beginning.'
228
229 Returns:
230 Self.
231 """
232 return self
233
234 def next(self):
235 line = self.readline()
236 if not line:
237 raise StopIteration()
238 return line
239
240 def readline(self, size=-1):
241 """Read one line delimited by '\n' from the file.
242
243 A trailing newline character is kept in the string. It may be absent when a
244 file ends with an incomplete line. If the size argument is non-negative,
245 it specifies the maximum string size (counting the newline) to return.
246 A negative size is the same as unspecified. Empty string is returned
247 only when EOF is encountered immediately.
248
249 Args:
250 size: Maximum number of bytes to read. If not specified, readline stops
251 only on '\n' or EOF.
252
253 Returns:
254 The data read as a string.
255
256 Raises:
257 IOError: When this buffer is closed.
258 """
259 self._check_open()
260 if size == 0 or not self._remaining():
261 return ''
262
263 data_list = []
264 newline_offset = self._buffer.find_newline(size)
265 while newline_offset < 0:
266 data = self._buffer.read(size)
267 size -= len(data)
268 self._offset += len(data)
269 data_list.append(data)
270 if size == 0 or not self._remaining():
271 return ''.join(data_list)
272 self._buffer.reset(self._buffer_future.get_result())
273 self._request_next_buffer()
274 newline_offset = self._buffer.find_newline(size)
275
276 data = self._buffer.read_to_offset(newline_offset + 1)
277 self._offset += len(data)
278 data_list.append(data)
279
280 return ''.join(data_list)
281
282 def read(self, size=-1):
283 """Read data from RAW file.
284
285 Args:
286 size: Number of bytes to read as integer. Actual number of bytes
287 read is always equal to size unless EOF is reached. If size is
288 negative or unspecified, read the entire file.
289
290 Returns:
291 data read as str.
292
293 Raises:
294 IOError: When this buffer is closed.
295 """
296 self._check_open()
297 if not self._remaining():
298 return ''
299
300 data_list = []
301 while True:
302 remaining = self._buffer.remaining()
303 if size >= 0 and size < remaining:
304 data_list.append(self._buffer.read(size))
305 self._offset += size
306 break
307 else:
308 size -= remaining
309 self._offset += remaining
310 data_list.append(self._buffer.read())
311
312 if self._buffer_future is None:
313 if size < 0 or size >= self._remaining():
314 needs = self._remaining()
315 else:
316 needs = size
317 data_list.extend(self._get_segments(self._offset, needs))
318 self._offset += needs
319 break
320
321 if self._buffer_future:
322 self._buffer.reset(self._buffer_future.get_result())
323 self._buffer_future = None
324
325 if self._buffer_future is None:
326 self._request_next_buffer()
327 return ''.join(data_list)
328
329 def _remaining(self):
330 return self._file_size - self._offset
331
332 def _request_next_buffer(self):
333 """Request next buffer.
334
335 Requires self._offset and self._buffer are in consistent state
336 """
337 self._buffer_future = None
338 next_offset = self._offset + self._buffer.remaining()
339 if not hasattr(self, '_file_size') or next_offset != self._file_size:
340 self._buffer_future = self._get_segment(next_offset,
341 self._buffer_size)
342
343 def _get_segments(self, start, request_size):
344 """Get segments of the file from Google Storage as a list.
345
346 A large request is broken into segments to avoid hitting urlfetch
347 response size limit. Each segment is returned from a separate urlfetch.
348
349 Args:
350 start: start offset to request. Inclusive. Have to be within the
351 range of the file.
352 request_size: number of bytes to request.
353
354 Returns:
355 A list of file segments in order
356 """
357 if not request_size:
358 return []
359
360 end = start + request_size
361 futures = []
362
363 while request_size > self._max_request_size:
364 futures.append(self._get_segment(start, self._max_request_size))
365 request_size -= self._max_request_size
366 start += self._max_request_size
367 if start < end:
368 futures.append(self._get_segment(start, end-start))
369 return [fut.get_result() for fut in futures]
370
371 @ndb.tasklet
372 def _get_segment(self, start, request_size):
373 """Get a segment of the file from Google Storage.
374
375 Args:
376 start: start offset of the segment. Inclusive. Have to be within the
377 range of the file.
378 request_size: number of bytes to request. Have to be small enough
379 for a single urlfetch request. May go over the logical range of the
380 file.
381
382 Yields:
383 a segment [start, start + request_size) of the file.
384
385 Raises:
386 ValueError: if the file has changed while reading.
387 """
388 end = start + request_size - 1
389 content_range = '%d-%d' % (start, end)
390 headers = {'Range': 'bytes=' + content_range}
391 status, resp_headers, content = yield self._api.get_object_async(
392 self.name, headers=headers)
393 errors.check_status(status, [200, 206], self.name, headers, resp_headers)
394 self._check_etag(resp_headers.get('etag'))
395 raise ndb.Return(content)
396
397 def _check_etag(self, etag):
398 """Check if etag is the same across requests to GCS.
399
400 If self._etag is None, set it. If etag is set, check that the new
401 etag equals the old one.
402
403 In the __init__ method, we fire one HEAD and one GET request using
404 ndb tasklet. One of them would return first and set the first value.
405
406 Args:
407 etag: etag from a GCS HTTP response. None if etag is not part of the
408 response header. It could be None for example in the case of GCS
409 composite file.
410
411 Raises:
412 ValueError: if two etags are not equal.
413 """
414 if etag is None:
415 return
416 elif self._etag is None:
417 self._etag = etag
418 elif self._etag != etag:
419 raise ValueError('File on GCS has changed while reading.')
420
421 def close(self):
422 self.closed = True
423 self._buffer = None
424 self._buffer_future = None
425
426 def __enter__(self):
427 return self
428
429 def __exit__(self, atype, value, traceback):
430 self.close()
431 return False
432
433 def seek(self, offset, whence=os.SEEK_SET):
434 """Set the file's current offset.
435
436 Note if the new offset is out of bound, it is adjusted to either 0 or EOF.
437
438 Args:
439 offset: seek offset as number.
440 whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
441 os.SEEK_CUR (seek relative to the current position), and os.SEEK_END
442 (seek relative to the end, offset should be negative).
443
444 Raises:
445 IOError: When this buffer is closed.
446 ValueError: When whence is invalid.
447 """
448 self._check_open()
449
450 self._buffer.reset()
451 self._buffer_future = None
452
453 if whence == os.SEEK_SET:
454 self._offset = offset
455 elif whence == os.SEEK_CUR:
456 self._offset += offset
457 elif whence == os.SEEK_END:
458 self._offset = self._file_size + offset
459 else:
460 raise ValueError('Whence mode %s is invalid.' % str(whence))
461
462 self._offset = min(self._offset, self._file_size)
463 self._offset = max(self._offset, 0)
464 if self._remaining():
465 self._request_next_buffer()
466
467 def tell(self):
468 """Tell the file's current offset.
469
470 Returns:
471 current offset in reading this file.
472
473 Raises:
474 IOError: When this buffer is closed.
475 """
476 self._check_open()
477 return self._offset
478
479 def _check_open(self):
480 if self.closed:
481 raise IOError('Buffer is closed.')
482
483 def seekable(self):
484 return True
485
486 def readable(self):
487 return True
488
489 def writable(self):
490 return False
491
492
493 class _Buffer(object):
494 """In memory buffer."""
495
496 def __init__(self):
497 self.reset()
498
499 def reset(self, content='', offset=0):
500 self._buffer = content
501 self._offset = offset
502
503 def read(self, size=-1):
504 """Returns bytes from self._buffer and update related offsets.
505
506 Args:
507 size: number of bytes to read starting from current offset.
508 Read the entire buffer if negative.
509
510 Returns:
511 Requested bytes from buffer.
512 """
513 if size < 0:
514 offset = len(self._buffer)
515 else:
516 offset = self._offset + size
517 return self.read_to_offset(offset)
518
519 def read_to_offset(self, offset):
520 """Returns bytes from self._buffer and update related offsets.
521
522 Args:
523 offset: read from current offset to this offset, exclusive.
524
525 Returns:
526 Requested bytes from buffer.
527 """
528 assert offset >= self._offset
529 result = self._buffer[self._offset: offset]
530 self._offset += len(result)
531 return result
532
533 def remaining(self):
534 return len(self._buffer) - self._offset
535
536 def find_newline(self, size=-1):
537 """Search for newline char in buffer starting from current offset.
538
539 Args:
540 size: number of bytes to search. -1 means all.
541
542 Returns:
543 offset of newline char in buffer. -1 if doesn't exist.
544 """
545 if size < 0:
546 return self._buffer.find('\n', self._offset)
547 return self._buffer.find('\n', self._offset, self._offset + size)
548
549
550 class StreamingBuffer(object):
551 """A class for creating large objects using the 'resumable' API.
552
553 The API is a subset of the Python writable stream API sufficient to
554 support writing zip files using the zipfile module.
555
556 The exact sequence of calls and use of headers is documented at
557 https://developers.google.com/storage/docs/developer-guide#unknownresumables
558 """
559
560 _blocksize = 256 * 1024
561
562 _maxrequestsize = 16 * _blocksize
563
564 def __init__(self,
565 api,
566 path,
567 content_type=None,
568 gcs_headers=None):
569 """Constructor.
570
571 Args:
572 api: A StorageApi instance.
573 path: Path to the object, e.g. '/mybucket/myfile'.
574 content_type: Optional content-type; Default value is
575 delegate to Google Cloud Storage.
576 gcs_headers: additional gs headers as a str->str dict, e.g
577 {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}.
578 """
579 assert self._maxrequestsize > self._blocksize
580 assert self._maxrequestsize % self._blocksize == 0
581
582 self._api = api
583 self.name = path
584 self.closed = False
585
586 self._buffer = collections.deque()
587 self._buffered = 0
588 self._written = 0
589 self._offset = 0
590
591 headers = {'x-goog-resumable': 'start'}
592 if content_type:
593 headers['content-type'] = content_type
594 if gcs_headers:
595 headers.update(gcs_headers)
596 status, resp_headers, _ = self._api.post_object(path, headers=headers)
597 errors.check_status(status, [201], path, headers, resp_headers)
598 loc = resp_headers.get('location')
599 if not loc:
600 raise IOError('No location header found in 201 response')
601 parsed = urlparse.urlparse(loc)
602 self._path_with_token = '%s?%s' % (self.name, parsed.query)
603
604 def __getstate__(self):
605 """Store state as part of serialization/pickling.
606
607 The contents of the write buffer are stored. Writes to the underlying
608 storage are required to be on block boundaries (_blocksize) except for the
609 last write. In the worst case the pickled version of this object may be
610 slightly larger than the blocksize.
611
612 Returns:
613 A dictionary with the state of this object
614
615 """
616 return {'api': self._api,
617 'name': self.name,
618 'path_token': self._path_with_token,
619 'buffer': self._buffer,
620 'buffered': self._buffered,
621 'written': self._written,
622 'offset': self._offset,
623 'closed': self.closed}
624
625 def __setstate__(self, state):
626 """Restore state as part of deserialization/unpickling.
627
628 Args:
629 state: the dictionary from a __getstate__ call
630 """
631 self._api = state['api']
632 self._path_with_token = state['path_token']
633 self._buffer = state['buffer']
634 self._buffered = state['buffered']
635 self._written = state['written']
636 self._offset = state['offset']
637 self.closed = state['closed']
638 self.name = state['name']
639
640 def write(self, data):
641 """Write some bytes.
642
643 Args:
644 data: data to write. str.
645
646 Raises:
647 TypeError: if data is not of type str.
648 """
649 self._check_open()
650 if not isinstance(data, str):
651 raise TypeError('Expected str but got %s.' % type(data))
652 if not data:
653 return
654 self._buffer.append(data)
655 self._buffered += len(data)
656 self._offset += len(data)
657 if self._buffered >= self._blocksize:
658 self._flush()
659
660 def flush(self):
661 """Dummy API.
662
663 This API is provided because the zipfile module uses it. It is a
664 no-op because Google Storage *requires* that all writes except for
665 the final one are multiples on 256K bytes aligned on 256K-byte
666 boundaries.
667 """
668 self._check_open()
669
670 def tell(self):
671 """Return the total number of bytes passed to write() so far.
672
673 (There is no seek() method.)
674 """
675 self._check_open()
676 return self._offset
677
678 def close(self):
679 """Flush the buffer and finalize the file.
680
681 When this returns the new file is available for reading.
682 """
683 if not self.closed:
684 self.closed = True
685 self._flush(finish=True)
686 self._buffer = None
687
688 def __enter__(self):
689 return self
690
691 def __exit__(self, atype, value, traceback):
692 self.close()
693 return False
694
695 def _flush(self, finish=False):
696 """Internal API to flush.
697
698 This is called only when the total amount of buffered data is at
699 least self._blocksize, or to flush the final (incomplete) block of
700 the file with finish=True.
701 """
702 flush_len = 0 if finish else self._blocksize
703 last = False
704
705 while self._buffered >= flush_len:
706 buffer = []
707 buffered = 0
708
709 while self._buffer:
710 buf = self._buffer.popleft()
711 size = len(buf)
712 self._buffered -= size
713 buffer.append(buf)
714 buffered += size
715 if buffered >= self._maxrequestsize:
716 break
717
718 if buffered > self._maxrequestsize:
719 excess = buffered - self._maxrequestsize
720 elif finish:
721 excess = 0
722 else:
723 excess = buffered % self._blocksize
724
725 if excess:
726 over = buffer.pop()
727 size = len(over)
728 assert size >= excess
729 buffered -= size
730 head, tail = over[:-excess], over[-excess:]
731 self._buffer.appendleft(tail)
732 self._buffered += len(tail)
733 if head:
734 buffer.append(head)
735 buffered += len(head)
736
737 if finish:
738 last = not self._buffered
739 self._send_data(''.join(buffer), last)
740 if last:
741 break
742
743 def _send_data(self, data, last):
744 """Send the block to the storage service and update self._written."""
745 headers = {}
746 length = self._written + len(data)
747
748 if data:
749 headers['content-range'] = ('bytes %d-%d/%s' %
750 (self._written, length-1,
751 length if last else '*'))
752 else:
753 headers['content-range'] = ('bytes */%s' %
754 length if last else '*')
755 status, response_headers, _ = self._api.put_object(
756 self._path_with_token, payload=data, headers=headers)
757 if last:
758 expected = 200
759 else:
760 expected = 308
761 if expected == 308 and status == 200:
762 logging.warning(
763 'This upload session for file %s has already been finalized. It is '
764 'likely this is an outdated copy of an already closed file handler.'
765 'Request headers: %r.\n'
766 'Response headers: %r.\n', self.name, headers, response_headers)
767 else:
768 errors.check_status(status, [expected], self.name, headers,
769 response_headers,
770 {'upload_path': self._path_with_token})
771 self._written += len(data)
772
773 def _check_open(self):
774 if self.closed:
775 raise IOError('Buffer is closed.')
776
777 def seekable(self):
778 return False
779
780 def readable(self):
781 return False
782
783 def writable(self):
784 return True
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698