Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(72)

Side by Side Diff: third_party/google_appengine_cloudstorage/cloudstorage/storage_api.py

Issue 139303023: add GCS support to docs server (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Updated third party library, rebased and fixed a path issue caused by rebasing Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2012 Google Inc. All Rights Reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing,
10 # software distributed under the License is distributed on an
11 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
12 # either express or implied. See the License for the specific
13 # language governing permissions and limitations under the License.
14
15 """Python wrappers for the Google Storage RESTful API."""
16
17
18
19
20
21 __all__ = ['ReadBuffer',
22 'StreamingBuffer',
23 ]
24
25 import collections
26 import logging
27 import os
28 import urlparse
29
30 from . import api_utils
31 from . import common
32 from . import errors
33 from . import rest_api
34
35 try:
36 from google.appengine.api import urlfetch
37 from google.appengine.ext import ndb
38 except ImportError:
39 from google.appengine.api import urlfetch
40 from google.appengine.ext import ndb
41
42
43
44 def _get_storage_api(retry_params, account_id=None):
45 """Returns storage_api instance for API methods.
46
47 Args:
48 retry_params: An instance of api_utils.RetryParams. If none,
49 thread's default will be used.
50 account_id: Internal-use only.
51
52 Returns:
53 A storage_api instance to handle urlfetch work to GCS.
54 On dev appserver, this instance by default will talk to a local stub
55 unless common.ACCESS_TOKEN is set. That token will be used to talk
56 to the real GCS.
57 """
58
59
60 api = _StorageApi(_StorageApi.full_control_scope,
61 service_account_id=account_id,
62 retry_params=retry_params)
63 if common.local_run() and not common.get_access_token():
64 api.api_url = common.local_api_url()
65 if common.get_access_token():
66 api.token = common.get_access_token()
67 return api
68
69
70 class _StorageApi(rest_api._RestApi):
71 """A simple wrapper for the Google Storage RESTful API.
72
73 WARNING: Do NOT directly use this api. It's an implementation detail
74 and is subject to change at any release.
75
76 All async methods have similar args and returns.
77
78 Args:
79 path: The path to the Google Storage object or bucket, e.g.
80 '/mybucket/myfile' or '/mybucket'.
81 **kwd: Options for urlfetch. e.g.
82 headers={'content-type': 'text/plain'}, payload='blah'.
83
84 Returns:
85 A ndb Future. When fulfilled, future.get_result() should return
86 a tuple of (status, headers, content) that represents a HTTP response
87 of Google Cloud Storage XML API.
88 """
89
90 api_url = 'https://storage.googleapis.com'
91 read_only_scope = 'https://www.googleapis.com/auth/devstorage.read_only'
92 read_write_scope = 'https://www.googleapis.com/auth/devstorage.read_write'
93 full_control_scope = 'https://www.googleapis.com/auth/devstorage.full_control'
94
95 def __getstate__(self):
96 """Store state as part of serialization/pickling.
97
98 Returns:
99 A tuple (of dictionaries) with the state of this object
100 """
101 return (super(_StorageApi, self).__getstate__(), {'api_url': self.api_url})
102
103 def __setstate__(self, state):
104 """Restore state as part of deserialization/unpickling.
105
106 Args:
107 state: the tuple from a __getstate__ call
108 """
109 superstate, localstate = state
110 super(_StorageApi, self).__setstate__(superstate)
111 self.api_url = localstate['api_url']
112
113 @api_utils._eager_tasklet
114 @ndb.tasklet
115 def do_request_async(self, url, method='GET', headers=None, payload=None,
116 deadline=None, callback=None):
117 """Inherit docs.
118
119 This method translates urlfetch exceptions to more service specific ones.
120 """
121 if headers is None:
122 headers = {}
123 if 'x-goog-api-version' not in headers:
124 headers['x-goog-api-version'] = '2'
125 headers['accept-encoding'] = 'gzip, *'
126 try:
127 resp_tuple = yield super(_StorageApi, self).do_request_async(
128 url, method=method, headers=headers, payload=payload,
129 deadline=deadline, callback=callback)
130 except urlfetch.DownloadError, e:
131 raise errors.TimeoutError(
132 'Request to Google Cloud Storage timed out.', e)
133
134 raise ndb.Return(resp_tuple)
135
136
137 def post_object_async(self, path, **kwds):
138 """POST to an object."""
139 return self.do_request_async(self.api_url + path, 'POST', **kwds)
140
141 def put_object_async(self, path, **kwds):
142 """PUT an object."""
143 return self.do_request_async(self.api_url + path, 'PUT', **kwds)
144
145 def get_object_async(self, path, **kwds):
146 """GET an object.
147
148 Note: No payload argument is supported.
149 """
150 return self.do_request_async(self.api_url + path, 'GET', **kwds)
151
152 def delete_object_async(self, path, **kwds):
153 """DELETE an object.
154
155 Note: No payload argument is supported.
156 """
157 return self.do_request_async(self.api_url + path, 'DELETE', **kwds)
158
159 def head_object_async(self, path, **kwds):
160 """HEAD an object.
161
162 Depending on request headers, HEAD returns various object properties,
163 e.g. Content-Length, Last-Modified, and ETag.
164
165 Note: No payload argument is supported.
166 """
167 return self.do_request_async(self.api_url + path, 'HEAD', **kwds)
168
169 def get_bucket_async(self, path, **kwds):
170 """GET a bucket."""
171 return self.do_request_async(self.api_url + path, 'GET', **kwds)
172
173
174 _StorageApi = rest_api.add_sync_methods(_StorageApi)
175
176
177 class ReadBuffer(object):
178 """A class for reading Google storage files."""
179
180 DEFAULT_BUFFER_SIZE = 1024 * 1024
181 MAX_REQUEST_SIZE = 30 * DEFAULT_BUFFER_SIZE
182
183 def __init__(self,
184 api,
185 path,
186 buffer_size=DEFAULT_BUFFER_SIZE,
187 max_request_size=MAX_REQUEST_SIZE):
188 """Constructor.
189
190 Args:
191 api: A StorageApi instance.
192 path: Quoted/escaped path to the object, e.g. /mybucket/myfile
193 buffer_size: buffer size. The ReadBuffer keeps
194 one buffer. But there may be a pending future that contains
195 a second buffer. This size must be less than max_request_size.
196 max_request_size: Max bytes to request in one urlfetch.
197 """
198 self._api = api
199 self._path = path
200 self.name = api_utils._unquote_filename(path)
201 self.closed = False
202
203 assert buffer_size <= max_request_size
204 self._buffer_size = buffer_size
205 self._max_request_size = max_request_size
206 self._offset = 0
207 self._buffer = _Buffer()
208 self._etag = None
209
210 self._request_next_buffer()
211
212 status, headers, _ = self._api.head_object(path)
213 errors.check_status(status, [200], path, resp_headers=headers)
214 self._file_size = long(headers['content-length'])
215 self._check_etag(headers.get('etag'))
216 if self._file_size == 0:
217 self._buffer_future = None
218
219 def __getstate__(self):
220 """Store state as part of serialization/pickling.
221
222 The contents of the read buffer are not stored, only the current offset for
223 data read by the client. A new read buffer is established at unpickling.
224 The head information for the object (file size and etag) are stored to
225 reduce startup and ensure the file has not changed.
226
227 Returns:
228 A dictionary with the state of this object
229 """
230 return {'api': self._api,
231 'path': self._path,
232 'buffer_size': self._buffer_size,
233 'request_size': self._max_request_size,
234 'etag': self._etag,
235 'size': self._file_size,
236 'offset': self._offset,
237 'closed': self.closed}
238
239 def __setstate__(self, state):
240 """Restore state as part of deserialization/unpickling.
241
242 Args:
243 state: the dictionary from a __getstate__ call
244
245 Along with restoring the state, pre-fetch the next read buffer.
246 """
247 self._api = state['api']
248 self._path = state['path']
249 self.name = api_utils._unquote_filename(self._path)
250 self._buffer_size = state['buffer_size']
251 self._max_request_size = state['request_size']
252 self._etag = state['etag']
253 self._file_size = state['size']
254 self._offset = state['offset']
255 self._buffer = _Buffer()
256 self.closed = state['closed']
257 self._buffer_future = None
258 if self._remaining() and not self.closed:
259 self._request_next_buffer()
260
261 def __iter__(self):
262 """Iterator interface.
263
264 Note the ReadBuffer container itself is the iterator. It's
265 (quote PEP0234)
266 'destructive: they consumes all the values and a second iterator
267 cannot easily be created that iterates independently over the same values.
268 You could open the file for the second time, or seek() to the beginning.'
269
270 Returns:
271 Self.
272 """
273 return self
274
275 def next(self):
276 line = self.readline()
277 if not line:
278 raise StopIteration()
279 return line
280
281 def readline(self, size=-1):
282 """Read one line delimited by '\n' from the file.
283
284 A trailing newline character is kept in the string. It may be absent when a
285 file ends with an incomplete line. If the size argument is non-negative,
286 it specifies the maximum string size (counting the newline) to return.
287 A negative size is the same as unspecified. Empty string is returned
288 only when EOF is encountered immediately.
289
290 Args:
291 size: Maximum number of bytes to read. If not specified, readline stops
292 only on '\n' or EOF.
293
294 Returns:
295 The data read as a string.
296
297 Raises:
298 IOError: When this buffer is closed.
299 """
300 self._check_open()
301 if size == 0 or not self._remaining():
302 return ''
303
304 data_list = []
305 newline_offset = self._buffer.find_newline(size)
306 while newline_offset < 0:
307 data = self._buffer.read(size)
308 size -= len(data)
309 self._offset += len(data)
310 data_list.append(data)
311 if size == 0 or not self._remaining():
312 return ''.join(data_list)
313 self._buffer.reset(self._buffer_future.get_result())
314 self._request_next_buffer()
315 newline_offset = self._buffer.find_newline(size)
316
317 data = self._buffer.read_to_offset(newline_offset + 1)
318 self._offset += len(data)
319 data_list.append(data)
320
321 return ''.join(data_list)
322
323 def read(self, size=-1):
324 """Read data from RAW file.
325
326 Args:
327 size: Number of bytes to read as integer. Actual number of bytes
328 read is always equal to size unless EOF is reached. If size is
329 negative or unspecified, read the entire file.
330
331 Returns:
332 data read as str.
333
334 Raises:
335 IOError: When this buffer is closed.
336 """
337 self._check_open()
338 if not self._remaining():
339 return ''
340
341 data_list = []
342 while True:
343 remaining = self._buffer.remaining()
344 if size >= 0 and size < remaining:
345 data_list.append(self._buffer.read(size))
346 self._offset += size
347 break
348 else:
349 size -= remaining
350 self._offset += remaining
351 data_list.append(self._buffer.read())
352
353 if self._buffer_future is None:
354 if size < 0 or size >= self._remaining():
355 needs = self._remaining()
356 else:
357 needs = size
358 data_list.extend(self._get_segments(self._offset, needs))
359 self._offset += needs
360 break
361
362 if self._buffer_future:
363 self._buffer.reset(self._buffer_future.get_result())
364 self._buffer_future = None
365
366 if self._buffer_future is None:
367 self._request_next_buffer()
368 return ''.join(data_list)
369
370 def _remaining(self):
371 return self._file_size - self._offset
372
373 def _request_next_buffer(self):
374 """Request next buffer.
375
376 Requires self._offset and self._buffer are in consistent state
377 """
378 self._buffer_future = None
379 next_offset = self._offset + self._buffer.remaining()
380 if not hasattr(self, '_file_size') or next_offset != self._file_size:
381 self._buffer_future = self._get_segment(next_offset,
382 self._buffer_size)
383
384 def _get_segments(self, start, request_size):
385 """Get segments of the file from Google Storage as a list.
386
387 A large request is broken into segments to avoid hitting urlfetch
388 response size limit. Each segment is returned from a separate urlfetch.
389
390 Args:
391 start: start offset to request. Inclusive. Have to be within the
392 range of the file.
393 request_size: number of bytes to request.
394
395 Returns:
396 A list of file segments in order
397 """
398 if not request_size:
399 return []
400
401 end = start + request_size
402 futures = []
403
404 while request_size > self._max_request_size:
405 futures.append(self._get_segment(start, self._max_request_size))
406 request_size -= self._max_request_size
407 start += self._max_request_size
408 if start < end:
409 futures.append(self._get_segment(start, end-start))
410 return [fut.get_result() for fut in futures]
411
412 @ndb.tasklet
413 def _get_segment(self, start, request_size):
414 """Get a segment of the file from Google Storage.
415
416 Args:
417 start: start offset of the segment. Inclusive. Have to be within the
418 range of the file.
419 request_size: number of bytes to request. Have to be small enough
420 for a single urlfetch request. May go over the logical range of the
421 file.
422
423 Yields:
424 a segment [start, start + request_size) of the file.
425
426 Raises:
427 ValueError: if the file has changed while reading.
428 """
429 end = start + request_size - 1
430 content_range = '%d-%d' % (start, end)
431 headers = {'Range': 'bytes=' + content_range}
432 status, resp_headers, content = yield self._api.get_object_async(
433 self._path, headers=headers)
434 errors.check_status(status, [200, 206], self._path, headers, resp_headers)
435 self._check_etag(resp_headers.get('etag'))
436 raise ndb.Return(content)
437
438 def _check_etag(self, etag):
439 """Check if etag is the same across requests to GCS.
440
441 If self._etag is None, set it. If etag is set, check that the new
442 etag equals the old one.
443
444 In the __init__ method, we fire one HEAD and one GET request using
445 ndb tasklet. One of them would return first and set the first value.
446
447 Args:
448 etag: etag from a GCS HTTP response. None if etag is not part of the
449 response header. It could be None for example in the case of GCS
450 composite file.
451
452 Raises:
453 ValueError: if two etags are not equal.
454 """
455 if etag is None:
456 return
457 elif self._etag is None:
458 self._etag = etag
459 elif self._etag != etag:
460 raise ValueError('File on GCS has changed while reading.')
461
462 def close(self):
463 self.closed = True
464 self._buffer = None
465 self._buffer_future = None
466
467 def __enter__(self):
468 return self
469
470 def __exit__(self, atype, value, traceback):
471 self.close()
472 return False
473
474 def seek(self, offset, whence=os.SEEK_SET):
475 """Set the file's current offset.
476
477 Note if the new offset is out of bound, it is adjusted to either 0 or EOF.
478
479 Args:
480 offset: seek offset as number.
481 whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
482 os.SEEK_CUR (seek relative to the current position), and os.SEEK_END
483 (seek relative to the end, offset should be negative).
484
485 Raises:
486 IOError: When this buffer is closed.
487 ValueError: When whence is invalid.
488 """
489 self._check_open()
490
491 self._buffer.reset()
492 self._buffer_future = None
493
494 if whence == os.SEEK_SET:
495 self._offset = offset
496 elif whence == os.SEEK_CUR:
497 self._offset += offset
498 elif whence == os.SEEK_END:
499 self._offset = self._file_size + offset
500 else:
501 raise ValueError('Whence mode %s is invalid.' % str(whence))
502
503 self._offset = min(self._offset, self._file_size)
504 self._offset = max(self._offset, 0)
505 if self._remaining():
506 self._request_next_buffer()
507
508 def tell(self):
509 """Tell the file's current offset.
510
511 Returns:
512 current offset in reading this file.
513
514 Raises:
515 IOError: When this buffer is closed.
516 """
517 self._check_open()
518 return self._offset
519
520 def _check_open(self):
521 if self.closed:
522 raise IOError('Buffer is closed.')
523
524 def seekable(self):
525 return True
526
527 def readable(self):
528 return True
529
530 def writable(self):
531 return False
532
533
534 class _Buffer(object):
535 """In memory buffer."""
536
537 def __init__(self):
538 self.reset()
539
540 def reset(self, content='', offset=0):
541 self._buffer = content
542 self._offset = offset
543
544 def read(self, size=-1):
545 """Returns bytes from self._buffer and update related offsets.
546
547 Args:
548 size: number of bytes to read starting from current offset.
549 Read the entire buffer if negative.
550
551 Returns:
552 Requested bytes from buffer.
553 """
554 if size < 0:
555 offset = len(self._buffer)
556 else:
557 offset = self._offset + size
558 return self.read_to_offset(offset)
559
560 def read_to_offset(self, offset):
561 """Returns bytes from self._buffer and update related offsets.
562
563 Args:
564 offset: read from current offset to this offset, exclusive.
565
566 Returns:
567 Requested bytes from buffer.
568 """
569 assert offset >= self._offset
570 result = self._buffer[self._offset: offset]
571 self._offset += len(result)
572 return result
573
574 def remaining(self):
575 return len(self._buffer) - self._offset
576
577 def find_newline(self, size=-1):
578 """Search for newline char in buffer starting from current offset.
579
580 Args:
581 size: number of bytes to search. -1 means all.
582
583 Returns:
584 offset of newline char in buffer. -1 if doesn't exist.
585 """
586 if size < 0:
587 return self._buffer.find('\n', self._offset)
588 return self._buffer.find('\n', self._offset, self._offset + size)
589
590
591 class StreamingBuffer(object):
592 """A class for creating large objects using the 'resumable' API.
593
594 The API is a subset of the Python writable stream API sufficient to
595 support writing zip files using the zipfile module.
596
597 The exact sequence of calls and use of headers is documented at
598 https://developers.google.com/storage/docs/developer-guide#unknownresumables
599 """
600
601 _blocksize = 256 * 1024
602
603 _maxrequestsize = 16 * _blocksize
604
605 def __init__(self,
606 api,
607 path,
608 content_type=None,
609 gcs_headers=None):
610 """Constructor.
611
612 Args:
613 api: A StorageApi instance.
614 path: Quoted/escaped path to the object, e.g. /mybucket/myfile
615 content_type: Optional content-type; Default value is
616 delegate to Google Cloud Storage.
617 gcs_headers: additional gs headers as a str->str dict, e.g
618 {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}.
619 """
620 assert self._maxrequestsize > self._blocksize
621 assert self._maxrequestsize % self._blocksize == 0
622
623 self._api = api
624 self._path = path
625 self.name = api_utils._unquote_filename(path)
626 self.closed = False
627
628 self._buffer = collections.deque()
629 self._buffered = 0
630 self._written = 0
631 self._offset = 0
632
633 headers = {'x-goog-resumable': 'start'}
634 if content_type:
635 headers['content-type'] = content_type
636 if gcs_headers:
637 headers.update(gcs_headers)
638 status, resp_headers, _ = self._api.post_object(path, headers=headers)
639 errors.check_status(status, [201], path, headers, resp_headers)
640 loc = resp_headers.get('location')
641 if not loc:
642 raise IOError('No location header found in 201 response')
643 parsed = urlparse.urlparse(loc)
644 self._path_with_token = '%s?%s' % (self._path, parsed.query)
645
646 def __getstate__(self):
647 """Store state as part of serialization/pickling.
648
649 The contents of the write buffer are stored. Writes to the underlying
650 storage are required to be on block boundaries (_blocksize) except for the
651 last write. In the worst case the pickled version of this object may be
652 slightly larger than the blocksize.
653
654 Returns:
655 A dictionary with the state of this object
656
657 """
658 return {'api': self._api,
659 'path': self._path,
660 'path_token': self._path_with_token,
661 'buffer': self._buffer,
662 'buffered': self._buffered,
663 'written': self._written,
664 'offset': self._offset,
665 'closed': self.closed}
666
667 def __setstate__(self, state):
668 """Restore state as part of deserialization/unpickling.
669
670 Args:
671 state: the dictionary from a __getstate__ call
672 """
673 self._api = state['api']
674 self._path_with_token = state['path_token']
675 self._buffer = state['buffer']
676 self._buffered = state['buffered']
677 self._written = state['written']
678 self._offset = state['offset']
679 self.closed = state['closed']
680 self._path = state['path']
681 self.name = api_utils._unquote_filename(self._path)
682
683 def write(self, data):
684 """Write some bytes.
685
686 Args:
687 data: data to write. str.
688
689 Raises:
690 TypeError: if data is not of type str.
691 """
692 self._check_open()
693 if not isinstance(data, str):
694 raise TypeError('Expected str but got %s.' % type(data))
695 if not data:
696 return
697 self._buffer.append(data)
698 self._buffered += len(data)
699 self._offset += len(data)
700 if self._buffered >= self._blocksize:
701 self._flush()
702
703 def flush(self):
704 """Dummy API.
705
706 This API is provided because the zipfile module uses it. It is a
707 no-op because Google Storage *requires* that all writes except for
708 the final one are multiples on 256K bytes aligned on 256K-byte
709 boundaries.
710 """
711 self._check_open()
712
713 def tell(self):
714 """Return the total number of bytes passed to write() so far.
715
716 (There is no seek() method.)
717 """
718 return self._offset
719
720 def close(self):
721 """Flush the buffer and finalize the file.
722
723 When this returns the new file is available for reading.
724 """
725 if not self.closed:
726 self.closed = True
727 self._flush(finish=True)
728 self._buffer = None
729
730 def __enter__(self):
731 return self
732
733 def __exit__(self, atype, value, traceback):
734 self.close()
735 return False
736
737 def _flush(self, finish=False):
738 """Internal API to flush.
739
740 This is called only when the total amount of buffered data is at
741 least self._blocksize, or to flush the final (incomplete) block of
742 the file with finish=True.
743 """
744 flush_len = 0 if finish else self._blocksize
745
746 while self._buffered >= flush_len:
747 buffer = []
748 buffered = 0
749
750 while self._buffer:
751 buf = self._buffer.popleft()
752 size = len(buf)
753 self._buffered -= size
754 buffer.append(buf)
755 buffered += size
756 if buffered >= self._maxrequestsize:
757 break
758
759 if buffered > self._maxrequestsize:
760 excess = buffered - self._maxrequestsize
761 elif finish:
762 excess = 0
763 else:
764 excess = buffered % self._blocksize
765
766 if excess:
767 over = buffer.pop()
768 size = len(over)
769 assert size >= excess
770 buffered -= size
771 head, tail = over[:-excess], over[-excess:]
772 self._buffer.appendleft(tail)
773 self._buffered += len(tail)
774 if head:
775 buffer.append(head)
776 buffered += len(head)
777
778 data = ''.join(buffer)
779 file_len = '*'
780 if finish and not self._buffered:
781 file_len = self._written + len(data)
782 self._send_data(data, self._written, file_len)
783 self._written += len(data)
784 if file_len != '*':
785 break
786
787 def _send_data(self, data, start_offset, file_len):
788 """Send the block to the storage service.
789
790 This is a utility method that does not modify self.
791
792 Args:
793 data: data to send in str.
794 start_offset: start offset of the data in relation to the file.
795 file_len: an int if this is the last data to append to the file.
796 Otherwise '*'.
797 """
798 headers = {}
799 end_offset = start_offset + len(data) - 1
800
801 if data:
802 headers['content-range'] = ('bytes %d-%d/%s' %
803 (start_offset, end_offset, file_len))
804 else:
805 headers['content-range'] = ('bytes */%s' % file_len)
806
807 status, response_headers, _ = self._api.put_object(
808 self._path_with_token, payload=data, headers=headers)
809 if file_len == '*':
810 expected = 308
811 else:
812 expected = 200
813 errors.check_status(status, [expected], self._path, headers,
814 response_headers,
815 {'upload_path': self._path_with_token})
816
817 def _get_offset_from_gcs(self):
818 """Get the last offset that has been written to GCS.
819
820 This is a utility method that does not modify self.
821
822 Returns:
823 an int of the last offset written to GCS by this upload, inclusive.
824 -1 means nothing has been written.
825 """
826 headers = {'content-range': 'bytes */*'}
827 status, response_headers, _ = self._api.put_object(
828 self._path_with_token, headers=headers)
829 errors.check_status(status, [308], self._path, headers,
830 response_headers,
831 {'upload_path': self._path_with_token})
832 val = response_headers.get('range')
833 if val is None:
834 return -1
835 _, offset = val.rsplit('-', 1)
836 return int(offset)
837
838 def _force_close(self, file_length=None):
839 """Close this buffer on file_length.
840
841 Finalize this upload immediately on file_length.
842 Contents that are still in memory will not be uploaded.
843
844 This is a utility method that does not modify self.
845
846 Args:
847 file_length: file length. Must match what has been uploaded. If None,
848 it will be queried from GCS.
849 """
850 if file_length is None:
851 file_length = self._get_offset_from_gcs() + 1
852 self._send_data('', 0, file_length)
853
854 def _check_open(self):
855 if self.closed:
856 raise IOError('Buffer is closed.')
857
858 def seekable(self):
859 return False
860
861 def readable(self):
862 return False
863
864 def writable(self):
865 return True
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698