| OLD | NEW |
| (Empty) | |
| 1 # Copyright 2012 Google Inc. All Rights Reserved. |
| 2 |
| 3 """File Interface for Google Cloud Storage.""" |
| 4 |
| 5 |
| 6 |
| 7 from __future__ import with_statement |
| 8 |
| 9 |
| 10 |
| 11 __all__ = ['delete', |
| 12 'listbucket', |
| 13 'open', |
| 14 'stat', |
| 15 ] |
| 16 |
| 17 import logging |
| 18 import StringIO |
| 19 import urllib |
| 20 import xml.etree.cElementTree as ET |
| 21 from . import api_utils |
| 22 from . import common |
| 23 from . import errors |
| 24 from . import storage_api |
| 25 |
| 26 |
| 27 |
| 28 def open(filename, |
| 29 mode='r', |
| 30 content_type=None, |
| 31 options=None, |
| 32 read_buffer_size=storage_api.ReadBuffer.DEFAULT_BUFFER_SIZE, |
| 33 retry_params=None, |
| 34 _account_id=None): |
| 35 """Opens a Google Cloud Storage file and returns it as a File-like object. |
| 36 |
| 37 Args: |
| 38 filename: A Google Cloud Storage filename of form '/bucket/filename'. |
| 39 mode: 'r' for reading mode. 'w' for writing mode. |
| 40 In reading mode, the file must exist. In writing mode, a file will |
| 41 be created or be overrode. |
| 42 content_type: The MIME type of the file. str. Only valid in writing mode. |
| 43 options: A str->basestring dict to specify additional headers to pass to |
| 44 GCS e.g. {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}. |
| 45 Supported options are x-goog-acl, x-goog-meta-, cache-control, |
| 46 content-disposition, and content-encoding. |
| 47 Only valid in writing mode. |
| 48 See https://developers.google.com/storage/docs/reference-headers |
| 49 for details. |
| 50 read_buffer_size: The buffer size for read. Read keeps a buffer |
| 51 and prefetches another one. To minimize blocking for large files, |
| 52 always read by buffer size. To minimize number of RPC requests for |
| 53 small files, set a large buffer size. Max is 30MB. |
| 54 retry_params: An instance of api_utils.RetryParams for subsequent calls |
| 55 to GCS from this file handle. If None, the default one is used. |
| 56 _account_id: Internal-use only. |
| 57 |
| 58 Returns: |
| 59 A reading or writing buffer that supports File-like interface. Buffer |
| 60 must be closed after operations are done. |
| 61 |
| 62 Raises: |
| 63 errors.AuthorizationError: if authorization failed. |
| 64 errors.NotFoundError: if an object that's expected to exist doesn't. |
| 65 ValueError: invalid open mode or if content_type or options are specified |
| 66 in reading mode. |
| 67 """ |
| 68 common.validate_file_path(filename) |
| 69 api = _get_storage_api(retry_params=retry_params, account_id=_account_id) |
| 70 filename = api_utils._quote_filename(filename) |
| 71 |
| 72 if mode == 'w': |
| 73 common.validate_options(options) |
| 74 return storage_api.StreamingBuffer(api, filename, content_type, options) |
| 75 elif mode == 'r': |
| 76 if content_type or options: |
| 77 raise ValueError('Options and content_type can only be specified ' |
| 78 'for writing mode.') |
| 79 return storage_api.ReadBuffer(api, |
| 80 filename, |
| 81 buffer_size=read_buffer_size) |
| 82 else: |
| 83 raise ValueError('Invalid mode %s.' % mode) |
| 84 |
| 85 |
| 86 def delete(filename, retry_params=None, _account_id=None): |
| 87 """Delete a Google Cloud Storage file. |
| 88 |
| 89 Args: |
| 90 filename: A Google Cloud Storage filename of form '/bucket/filename'. |
| 91 retry_params: An api_utils.RetryParams for this call to GCS. If None, |
| 92 the default one is used. |
| 93 _account_id: Internal-use only. |
| 94 |
| 95 Raises: |
| 96 errors.NotFoundError: if the file doesn't exist prior to deletion. |
| 97 """ |
| 98 api = _get_storage_api(retry_params=retry_params, account_id=_account_id) |
| 99 common.validate_file_path(filename) |
| 100 filename = api_utils._quote_filename(filename) |
| 101 status, resp_headers, _ = api.delete_object(filename) |
| 102 errors.check_status(status, [204], filename, resp_headers=resp_headers) |
| 103 |
| 104 |
| 105 def stat(filename, retry_params=None, _account_id=None): |
| 106 """Get GCSFileStat of a Google Cloud storage file. |
| 107 |
| 108 Args: |
| 109 filename: A Google Cloud Storage filename of form '/bucket/filename'. |
| 110 retry_params: An api_utils.RetryParams for this call to GCS. If None, |
| 111 the default one is used. |
| 112 _account_id: Internal-use only. |
| 113 |
| 114 Returns: |
| 115 a GCSFileStat object containing info about this file. |
| 116 |
| 117 Raises: |
| 118 errors.AuthorizationError: if authorization failed. |
| 119 errors.NotFoundError: if an object that's expected to exist doesn't. |
| 120 """ |
| 121 common.validate_file_path(filename) |
| 122 api = _get_storage_api(retry_params=retry_params, account_id=_account_id) |
| 123 status, headers, _ = api.head_object(api_utils._quote_filename(filename)) |
| 124 errors.check_status(status, [200], filename, resp_headers=headers) |
| 125 file_stat = common.GCSFileStat( |
| 126 filename=filename, |
| 127 st_size=headers.get('content-length'), |
| 128 st_ctime=common.http_time_to_posix(headers.get('last-modified')), |
| 129 etag=headers.get('etag'), |
| 130 content_type=headers.get('content-type'), |
| 131 metadata=common.get_metadata(headers)) |
| 132 |
| 133 return file_stat |
| 134 |
| 135 |
| 136 def _copy2(src, dst, retry_params=None): |
| 137 """Copy the file content and metadata from src to dst. |
| 138 |
| 139 Internal use only! |
| 140 |
| 141 Args: |
| 142 src: /bucket/filename |
| 143 dst: /bucket/filename |
| 144 retry_params: An api_utils.RetryParams for this call to GCS. If None, |
| 145 the default one is used. |
| 146 |
| 147 Raises: |
| 148 errors.AuthorizationError: if authorization failed. |
| 149 errors.NotFoundError: if an object that's expected to exist doesn't. |
| 150 """ |
| 151 common.validate_file_path(src) |
| 152 common.validate_file_path(dst) |
| 153 if src == dst: |
| 154 return |
| 155 |
| 156 api = _get_storage_api(retry_params=retry_params) |
| 157 headers = {'x-goog-copy-source': src, 'Content-Length': '0'} |
| 158 status, resp_headers, _ = api.put_object( |
| 159 api_utils._quote_filename(dst), headers=headers) |
| 160 errors.check_status(status, [200], src, headers, resp_headers) |
| 161 |
| 162 |
| 163 def listbucket(path_prefix, marker=None, prefix=None, max_keys=None, |
| 164 delimiter=None, retry_params=None, _account_id=None): |
| 165 """Returns a GCSFileStat iterator over a bucket. |
| 166 |
| 167 Optional arguments can limit the result to a subset of files under bucket. |
| 168 |
| 169 This function has two modes: |
| 170 1. List bucket mode: Lists all files in the bucket without any concept of |
| 171 hierarchy. GCS doesn't have real directory hierarchies. |
| 172 2. Directory emulation mode: If you specify the 'delimiter' argument, |
| 173 it is used as a path separator to emulate a hierarchy of directories. |
| 174 In this mode, the "path_prefix" argument should end in the delimiter |
| 175 specified (thus designates a logical directory). The logical directory's |
| 176 contents, both files and subdirectories, are listed. The names of |
| 177 subdirectories returned will end with the delimiter. So listbucket |
| 178 can be called with the subdirectory name to list the subdirectory's |
| 179 contents. |
| 180 |
| 181 Args: |
| 182 path_prefix: A Google Cloud Storage path of format "/bucket" or |
| 183 "/bucket/prefix". Only objects whose fullpath starts with the |
| 184 path_prefix will be returned. |
| 185 marker: Another path prefix. Only objects whose fullpath starts |
| 186 lexicographically after marker will be returned (exclusive). |
| 187 prefix: Deprecated. Use path_prefix. |
| 188 max_keys: The limit on the number of objects to return. int. |
| 189 For best performance, specify max_keys only if you know how many objects |
| 190 you want. Otherwise, this method requests large batches and handles |
| 191 pagination for you. |
| 192 delimiter: Use to turn on directory mode. str of one or multiple chars |
| 193 that your bucket uses as its directory separator. |
| 194 retry_params: An api_utils.RetryParams for this call to GCS. If None, |
| 195 the default one is used. |
| 196 _account_id: Internal-use only. |
| 197 |
| 198 Examples: |
| 199 For files "/bucket/a", |
| 200 "/bucket/bar/1" |
| 201 "/bucket/foo", |
| 202 "/bucket/foo/1", "/bucket/foo/2/1", "/bucket/foo/3/1", |
| 203 |
| 204 Regular mode: |
| 205 listbucket("/bucket/f", marker="/bucket/foo/1") |
| 206 will match "/bucket/foo/2/1", "/bucket/foo/3/1". |
| 207 |
| 208 Directory mode: |
| 209 listbucket("/bucket/", delimiter="/") |
| 210 will match "/bucket/a, "/bucket/bar/" "/bucket/foo", "/bucket/foo/". |
| 211 listbucket("/bucket/foo/", delimiter="/") |
| 212 will match "/bucket/foo/1", "/bucket/foo/2/", "/bucket/foo/3/" |
| 213 |
| 214 Returns: |
| 215 Regular mode: |
| 216 A GCSFileStat iterator over matched files ordered by filename. |
| 217 The iterator returns GCSFileStat objects. filename, etag, st_size, |
| 218 st_ctime, and is_dir are set. |
| 219 |
| 220 Directory emulation mode: |
| 221 A GCSFileStat iterator over matched files and directories ordered by |
| 222 name. The iterator returns GCSFileStat objects. For directories, |
| 223 only the filename and is_dir fields are set. |
| 224 |
| 225 The last name yielded can be used as next call's marker. |
| 226 """ |
| 227 if prefix: |
| 228 common.validate_bucket_path(path_prefix) |
| 229 bucket = path_prefix |
| 230 else: |
| 231 bucket, prefix = common._process_path_prefix(path_prefix) |
| 232 |
| 233 if marker and marker.startswith(bucket): |
| 234 marker = marker[len(bucket) + 1:] |
| 235 |
| 236 api = _get_storage_api(retry_params=retry_params, account_id=_account_id) |
| 237 options = {} |
| 238 if marker: |
| 239 options['marker'] = marker |
| 240 if max_keys: |
| 241 options['max-keys'] = max_keys |
| 242 if prefix: |
| 243 options['prefix'] = prefix |
| 244 if delimiter: |
| 245 options['delimiter'] = delimiter |
| 246 |
| 247 return _Bucket(api, bucket, options) |
| 248 |
| 249 |
| 250 class _Bucket(object): |
| 251 """A wrapper for a GCS bucket as the return value of listbucket.""" |
| 252 |
| 253 def __init__(self, api, path, options): |
| 254 """Initialize. |
| 255 |
| 256 Args: |
| 257 api: storage_api instance. |
| 258 path: bucket path of form '/bucket'. |
| 259 options: a dict of listbucket options. Please see listbucket doc. |
| 260 """ |
| 261 self._init(api, path, options) |
| 262 |
| 263 def _init(self, api, path, options): |
| 264 self._api = api |
| 265 self._path = path |
| 266 self._options = options.copy() |
| 267 self._get_bucket_fut = self._api.get_bucket_async( |
| 268 self._path + '?' + urllib.urlencode(self._options)) |
| 269 self._last_yield = None |
| 270 self._new_max_keys = self._options.get('max-keys') |
| 271 |
| 272 def __getstate__(self): |
| 273 options = self._options |
| 274 if self._last_yield: |
| 275 options['marker'] = self._last_yield.filename[len(self._path) + 1:] |
| 276 if self._new_max_keys is not None: |
| 277 options['max-keys'] = self._new_max_keys |
| 278 return {'api': self._api, |
| 279 'path': self._path, |
| 280 'options': options} |
| 281 |
| 282 def __setstate__(self, state): |
| 283 self._init(state['api'], state['path'], state['options']) |
| 284 |
| 285 def __iter__(self): |
| 286 """Iter over the bucket. |
| 287 |
| 288 Yields: |
| 289 GCSFileStat: a GCSFileStat for an object in the bucket. |
| 290 They are ordered by GCSFileStat.filename. |
| 291 """ |
| 292 total = 0 |
| 293 max_keys = self._options.get('max-keys') |
| 294 |
| 295 while self._get_bucket_fut: |
| 296 status, resp_headers, content = self._get_bucket_fut.get_result() |
| 297 errors.check_status(status, [200], self._path, resp_headers=resp_headers, |
| 298 extras=self._options) |
| 299 |
| 300 if self._should_get_another_batch(content): |
| 301 self._get_bucket_fut = self._api.get_bucket_async( |
| 302 self._path + '?' + urllib.urlencode(self._options)) |
| 303 else: |
| 304 self._get_bucket_fut = None |
| 305 |
| 306 root = ET.fromstring(content) |
| 307 dirs = self._next_dir_gen(root) |
| 308 files = self._next_file_gen(root) |
| 309 next_file = files.next() |
| 310 next_dir = dirs.next() |
| 311 |
| 312 while ((max_keys is None or total < max_keys) and |
| 313 not (next_file is None and next_dir is None)): |
| 314 total += 1 |
| 315 if next_file is None: |
| 316 self._last_yield = next_dir |
| 317 next_dir = dirs.next() |
| 318 elif next_dir is None: |
| 319 self._last_yield = next_file |
| 320 next_file = files.next() |
| 321 elif next_dir < next_file: |
| 322 self._last_yield = next_dir |
| 323 next_dir = dirs.next() |
| 324 elif next_file < next_dir: |
| 325 self._last_yield = next_file |
| 326 next_file = files.next() |
| 327 else: |
| 328 logging.error( |
| 329 'Should never reach. next file is %r. next dir is %r.', |
| 330 next_file, next_dir) |
| 331 if self._new_max_keys: |
| 332 self._new_max_keys -= 1 |
| 333 yield self._last_yield |
| 334 |
| 335 def _next_file_gen(self, root): |
| 336 """Generator for next file element in the document. |
| 337 |
| 338 Args: |
| 339 root: root element of the XML tree. |
| 340 |
| 341 Yields: |
| 342 GCSFileStat for the next file. |
| 343 """ |
| 344 for e in root.getiterator(common._T_CONTENTS): |
| 345 st_ctime, size, etag, key = None, None, None, None |
| 346 for child in e.getiterator('*'): |
| 347 if child.tag == common._T_LAST_MODIFIED: |
| 348 st_ctime = common.dt_str_to_posix(child.text) |
| 349 elif child.tag == common._T_ETAG: |
| 350 etag = child.text |
| 351 elif child.tag == common._T_SIZE: |
| 352 size = child.text |
| 353 elif child.tag == common._T_KEY: |
| 354 key = child.text |
| 355 yield common.GCSFileStat(self._path + '/' + key, |
| 356 size, etag, st_ctime) |
| 357 e.clear() |
| 358 yield None |
| 359 |
| 360 def _next_dir_gen(self, root): |
| 361 """Generator for next directory element in the document. |
| 362 |
| 363 Args: |
| 364 root: root element in the XML tree. |
| 365 |
| 366 Yields: |
| 367 GCSFileStat for the next directory. |
| 368 """ |
| 369 for e in root.getiterator(common._T_COMMON_PREFIXES): |
| 370 yield common.GCSFileStat( |
| 371 self._path + '/' + e.find(common._T_PREFIX).text, |
| 372 st_size=None, etag=None, st_ctime=None, is_dir=True) |
| 373 e.clear() |
| 374 yield None |
| 375 |
| 376 def _should_get_another_batch(self, content): |
| 377 """Whether to issue another GET bucket call. |
| 378 |
| 379 Args: |
| 380 content: response XML. |
| 381 |
| 382 Returns: |
| 383 True if should, also update self._options for the next request. |
| 384 False otherwise. |
| 385 """ |
| 386 if ('max-keys' in self._options and |
| 387 self._options['max-keys'] <= common._MAX_GET_BUCKET_RESULT): |
| 388 return False |
| 389 |
| 390 elements = self._find_elements( |
| 391 content, set([common._T_IS_TRUNCATED, |
| 392 common._T_NEXT_MARKER])) |
| 393 if elements.get(common._T_IS_TRUNCATED, 'false').lower() != 'true': |
| 394 return False |
| 395 |
| 396 next_marker = elements.get(common._T_NEXT_MARKER) |
| 397 if next_marker is None: |
| 398 self._options.pop('marker', None) |
| 399 return False |
| 400 self._options['marker'] = next_marker |
| 401 return True |
| 402 |
| 403 def _find_elements(self, result, elements): |
| 404 """Find interesting elements from XML. |
| 405 |
| 406 This function tries to only look for specified elements |
| 407 without parsing the entire XML. The specified elements is better |
| 408 located near the beginning. |
| 409 |
| 410 Args: |
| 411 result: response XML. |
| 412 elements: a set of interesting element tags. |
| 413 |
| 414 Returns: |
| 415 A dict from element tag to element value. |
| 416 """ |
| 417 element_mapping = {} |
| 418 result = StringIO.StringIO(result) |
| 419 for _, e in ET.iterparse(result, events=('end',)): |
| 420 if not elements: |
| 421 break |
| 422 if e.tag in elements: |
| 423 element_mapping[e.tag] = e.text |
| 424 elements.remove(e.tag) |
| 425 return element_mapping |
| 426 |
| 427 |
| 428 def _get_storage_api(retry_params, account_id=None): |
| 429 """Returns storage_api instance for API methods. |
| 430 |
| 431 Args: |
| 432 retry_params: An instance of api_utils.RetryParams. |
| 433 account_id: Internal-use only. |
| 434 |
| 435 Returns: |
| 436 A storage_api instance to handle urlfetch work to GCS. |
| 437 On dev appserver, this instance by default will talk to a local stub |
| 438 unless common.ACCESS_TOKEN is set. That token will be used to talk |
| 439 to the real GCS. |
| 440 """ |
| 441 |
| 442 |
| 443 api = storage_api._StorageApi(storage_api._StorageApi.full_control_scope, |
| 444 service_account_id=account_id, |
| 445 retry_params=retry_params) |
| 446 if common.local_run() and not common.get_access_token(): |
| 447 api.api_url = common.local_api_url() |
| 448 if common.get_access_token(): |
| 449 api.token = common.get_access_token() |
| 450 return api |
| OLD | NEW |