Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 #!/usr/bin/python | 1 #!/usr/bin/python |
| 2 | 2 |
| 3 # pylint: disable=C0301 | 3 # pylint: disable=C0301 |
| 4 """ | 4 """ |
| 5 Copyright 2014 Google Inc. | 5 Copyright 2014 Google Inc. |
| 6 | 6 |
| 7 Use of this source code is governed by a BSD-style license that can be | 7 Use of this source code is governed by a BSD-style license that can be |
| 8 found in the LICENSE file. | 8 found in the LICENSE file. |
| 9 | 9 |
| 10 Utilities for accessing Google Cloud Storage, using the boto library (wrapper | 10 Utilities for accessing Google Cloud Storage, using the boto library (wrapper |
| 11 for the XML API). | 11 for the XML API). |
| 12 | 12 |
| 13 API/library references: | 13 API/library references: |
| 14 - https://developers.google.com/storage/docs/reference-guide | 14 - https://developers.google.com/storage/docs/reference-guide |
| 15 - http://googlecloudstorage.blogspot.com/2012/09/google-cloud-storage-tutorial-u sing-boto.html | 15 - http://googlecloudstorage.blogspot.com/2012/09/google-cloud-storage-tutorial-u sing-boto.html |
| 16 """ | 16 """ |
| 17 # pylint: enable=C0301 | 17 # pylint: enable=C0301 |
| 18 | 18 |
| 19 # System-level imports | 19 # System-level imports |
| 20 import errno | 20 import errno |
| 21 import hashlib | 21 import hashlib |
| 22 import logging | |
| 22 import os | 23 import os |
| 23 import posixpath | 24 import posixpath |
| 24 import re | 25 import re |
| 25 import sys | 26 import sys |
| 27 from Queue import Queue | |
| 28 from threading import Thread | |
| 26 | 29 |
| 27 # Imports from third-party code | 30 # Imports from third-party code |
| 28 TRUNK_DIRECTORY = os.path.abspath(os.path.join( | 31 TRUNK_DIRECTORY = os.path.abspath(os.path.join( |
| 29 os.path.dirname(__file__), os.pardir, os.pardir)) | 32 os.path.dirname(__file__), os.pardir, os.pardir)) |
| 30 for import_subdir in ['boto']: | 33 for import_subdir in ['boto']: |
| 31 import_dirpath = os.path.join( | 34 import_dirpath = os.path.join( |
| 32 TRUNK_DIRECTORY, 'third_party', 'externals', import_subdir) | 35 TRUNK_DIRECTORY, 'third_party', 'externals', import_subdir) |
| 33 if import_dirpath not in sys.path: | 36 if import_dirpath not in sys.path: |
| 34 # We need to insert at the beginning of the path, to make sure that our | 37 # We need to insert at the beginning of the path, to make sure that our |
| 35 # imported versions are favored over others that might be in the path. | 38 # imported versions are favored over others that might be in the path. |
| (...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 109 take longer than just uploading the file. | 112 take longer than just uploading the file. |
| 110 See http://skbug.com/2778 ('gs_utils: when uploading IF_NEW, batch up | 113 See http://skbug.com/2778 ('gs_utils: when uploading IF_NEW, batch up |
| 111 checks for existing files within a single remote directory') | 114 checks for existing files within a single remote directory') |
| 112 """ | 115 """ |
| 113 ALWAYS = 1 # always upload the file | 116 ALWAYS = 1 # always upload the file |
| 114 IF_NEW = 2 # if there is an existing file with the same name, | 117 IF_NEW = 2 # if there is an existing file with the same name, |
| 115 # leave it alone | 118 # leave it alone |
| 116 IF_MODIFIED = 3 # if there is an existing file with the same name and | 119 IF_MODIFIED = 3 # if there is an existing file with the same name and |
| 117 # contents, leave it alone | 120 # contents, leave it alone |
| 118 | 121 |
| 119 def __init__(self, boto_file_path=None): | 122 def __init__(self, boto_file_path=None, logger=None): |
| 120 """Constructor. | 123 """Constructor. |
| 121 | 124 |
| 122 Params: | 125 Params: |
| 123 boto_file_path: full path (local-OS-style) on local disk where .boto | 126 boto_file_path: full path (local-OS-style) on local disk where .boto |
| 124 credentials file can be found. If None, then the GSUtils object | 127 credentials file can be found. If None, then the GSUtils object |
| 125 created will be able to access only public files in Google Storage. | 128 created will be able to access only public files in Google Storage. |
| 129 logger: a logging.Logger instance to use for logging output; if None, | |
| 130 one will be created with default characteristics | |
| 126 | 131 |
| 127 Raises an exception if no file is found at boto_file_path, or if the file | 132 Raises an exception if no file is found at boto_file_path, or if the file |
| 128 found there is malformed. | 133 found there is malformed. |
| 129 """ | 134 """ |
| 135 self.logger = logger or logging.getLogger(__name__) | |
| 130 self._gs_access_key_id = None | 136 self._gs_access_key_id = None |
| 131 self._gs_secret_access_key = None | 137 self._gs_secret_access_key = None |
| 132 if boto_file_path: | 138 if boto_file_path: |
| 133 print 'Reading boto file from %s' % boto_file_path | 139 self.logger.info('Reading boto file from %s' % boto_file_path) |
| 134 boto_dict = _config_file_as_dict(filepath=boto_file_path) | 140 boto_dict = _config_file_as_dict(filepath=boto_file_path) |
| 135 self._gs_access_key_id = boto_dict['gs_access_key_id'] | 141 self._gs_access_key_id = boto_dict['gs_access_key_id'] |
| 136 self._gs_secret_access_key = boto_dict['gs_secret_access_key'] | 142 self._gs_secret_access_key = boto_dict['gs_secret_access_key'] |
| 137 # Which field we get/set in ACL entries, depending on IdType. | 143 # Which field we get/set in ACL entries, depending on IdType. |
| 138 self._field_by_id_type = { | 144 self._field_by_id_type = { |
| 139 self.IdType.GROUP_BY_DOMAIN: 'domain', | 145 self.IdType.GROUP_BY_DOMAIN: 'domain', |
| 140 self.IdType.GROUP_BY_EMAIL: 'email_address', | 146 self.IdType.GROUP_BY_EMAIL: 'email_address', |
| 141 self.IdType.GROUP_BY_ID: 'id', | 147 self.IdType.GROUP_BY_ID: 'id', |
| 142 self.IdType.USER_BY_EMAIL: 'email_address', | 148 self.IdType.USER_BY_EMAIL: 'email_address', |
| 143 self.IdType.USER_BY_ID: 'id', | 149 self.IdType.USER_BY_ID: 'id', |
| (...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 208 so that HTTP downloads of the file would be unzipped automatically. | 214 so that HTTP downloads of the file would be unzipped automatically. |
| 209 See https://developers.google.com/storage/docs/gsutil/addlhelp/ | 215 See https://developers.google.com/storage/docs/gsutil/addlhelp/ |
| 210 WorkingWithObjectMetadata#content-encoding | 216 WorkingWithObjectMetadata#content-encoding |
| 211 """ | 217 """ |
| 212 b = self._connect_to_bucket(bucket=dest_bucket) | 218 b = self._connect_to_bucket(bucket=dest_bucket) |
| 213 local_md5 = None # filled in lazily | 219 local_md5 = None # filled in lazily |
| 214 | 220 |
| 215 if upload_if == self.UploadIf.IF_NEW: | 221 if upload_if == self.UploadIf.IF_NEW: |
| 216 old_key = b.get_key(key_name=dest_path) | 222 old_key = b.get_key(key_name=dest_path) |
| 217 if old_key: | 223 if old_key: |
| 218 print 'Skipping upload of existing file gs://%s/%s' % ( | 224 self.logger.info('Skipping upload of existing file gs://%s/%s' % ( |
| 219 b.name, dest_path) | 225 b.name, dest_path)) |
| 220 return | 226 return |
| 221 elif upload_if == self.UploadIf.IF_MODIFIED: | 227 elif upload_if == self.UploadIf.IF_MODIFIED: |
| 222 old_key = b.get_key(key_name=dest_path) | 228 old_key = b.get_key(key_name=dest_path) |
| 223 if old_key: | 229 if old_key: |
| 224 if not local_md5: | 230 if not local_md5: |
| 225 local_md5 = _get_local_md5(path=source_path) | 231 local_md5 = _get_local_md5(path=source_path) |
| 226 if ('"%s"' % local_md5) == old_key.etag: | 232 if ('"%s"' % local_md5) == old_key.etag: |
| 227 print 'Skipping upload of unmodified file gs://%s/%s : %s' % ( | 233 self.logger.info( |
| 228 b.name, dest_path, local_md5) | 234 'Skipping upload of unmodified file gs://%s/%s : %s' % ( |
| 235 b.name, dest_path, local_md5)) | |
| 229 return | 236 return |
| 230 elif upload_if != self.UploadIf.ALWAYS: | 237 elif upload_if != self.UploadIf.ALWAYS: |
| 231 raise Exception('unknown value of upload_if: %s' % upload_if) | 238 raise Exception('unknown value of upload_if: %s' % upload_if) |
| 232 | 239 |
| 233 # Upload the file using a temporary name at first, in case the transfer | 240 # Upload the file using a temporary name at first, in case the transfer |
| 234 # is interrupted partway through. | 241 # is interrupted partway through. |
| 235 if not local_md5: | 242 if not local_md5: |
| 236 local_md5 = _get_local_md5(path=source_path) | 243 local_md5 = _get_local_md5(path=source_path) |
| 237 initial_key = Key(b) | 244 initial_key = Key(b) |
| 238 initial_key.name = dest_path + '-uploading-' + local_md5 | 245 initial_key.name = dest_path + '-uploading-' + local_md5 |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 273 | 280 |
| 274 # Set ACLs on the file. | 281 # Set ACLs on the file. |
| 275 # We do this *after* copy_key(), because copy_key's preserve_acl | 282 # We do this *after* copy_key(), because copy_key's preserve_acl |
| 276 # functionality would incur a performance hit. | 283 # functionality would incur a performance hit. |
| 277 for (id_type, id_value, permission) in fine_grained_acl_list or []: | 284 for (id_type, id_value, permission) in fine_grained_acl_list or []: |
| 278 self.set_acl( | 285 self.set_acl( |
| 279 bucket=b, path=final_key.name, | 286 bucket=b, path=final_key.name, |
| 280 id_type=id_type, id_value=id_value, permission=permission) | 287 id_type=id_type, id_value=id_value, permission=permission) |
| 281 | 288 |
| 282 def upload_dir_contents(self, source_dir, dest_bucket, dest_dir, | 289 def upload_dir_contents(self, source_dir, dest_bucket, dest_dir, |
| 283 upload_if=UploadIf.ALWAYS, **kwargs): | 290 num_threads=10, upload_if=UploadIf.ALWAYS, **kwargs): |
| 284 """Recursively upload contents of a local directory to Google Storage. | 291 """Recursively upload contents of a local directory to Google Storage. |
| 285 | 292 |
| 286 params: | 293 params: |
| 287 source_dir: full path (local-OS-style) on local disk of directory to copy | 294 source_dir: full path (local-OS-style) on local disk of directory to copy |
| 288 contents of | 295 contents of |
| 289 dest_bucket: GS bucket to copy the files into | 296 dest_bucket: GS bucket to copy the files into |
| 290 dest_dir: full path (Posix-style) within that bucket; write the files into | 297 dest_dir: full path (Posix-style) within that bucket; write the files into |
| 291 this directory. If None, write into the root directory of the bucket. | 298 this directory. If None, write into the root directory of the bucket. |
| 299 num_threads: how many files to upload at once | |
| 292 upload_if: one of the UploadIf values, describing in which cases we should | 300 upload_if: one of the UploadIf values, describing in which cases we should |
| 293 upload the file | 301 upload the file |
| 294 kwargs: any additional keyword arguments "inherited" from upload_file() | 302 kwargs: any additional keyword arguments "inherited" from upload_file() |
| 295 | 303 |
| 296 The copy operates as a merge: any files in source_dir will be "overlaid" on | 304 The copy operates as a merge: any files in source_dir will be "overlaid" on |
| 297 top of the existing content in dest_dir. Existing files with the same names | 305 top of the existing content in dest_dir. Existing files with the same names |
| 298 may or may not be overwritten, depending on the value of upload_if. | 306 may or may not be overwritten, depending on the value of upload_if. |
| 299 | 307 |
| 300 TODO(epoger): Upload multiple files simultaneously to reduce latency. | 308 TODO(epoger): Upload multiple files simultaneously to reduce latency. |
| 301 """ | 309 """ |
| 302 b = self._connect_to_bucket(bucket=dest_bucket) | 310 b = self._connect_to_bucket(bucket=dest_bucket) |
| 303 if not dest_dir: | 311 if not dest_dir: |
| 304 dest_dir = '' | 312 dest_dir = '' |
| 305 | 313 |
| 306 # Create a set of all files within source_dir. | 314 # Create a set of all files within source_dir. |
| 307 source_fileset = set() | 315 source_fileset = set() |
| 308 prefix_length = len(source_dir)+1 | 316 prefix_length = len(source_dir)+1 |
| 309 for dirpath, _, filenames in os.walk(source_dir): | 317 for dirpath, _, filenames in os.walk(source_dir): |
| 310 relative_dirpath = dirpath[prefix_length:] | 318 relative_dirpath = dirpath[prefix_length:] |
| 311 for filename in filenames: | 319 for filename in filenames: |
| 312 source_fileset.add(os.path.join(relative_dirpath, filename)) | 320 source_fileset.add(os.path.join(relative_dirpath, filename)) |
| 321 num_files_total = len(source_fileset) | |
| 313 | 322 |
| 314 # If we are only uploading files conditionally, remove any unnecessary | 323 # If we are only uploading files conditionally, remove any unnecessary |
| 315 # files from source_fileset. | 324 # files from source_fileset. |
| 316 if upload_if == self.UploadIf.ALWAYS: | 325 if upload_if == self.UploadIf.ALWAYS: |
| 317 pass # there are no shortcuts... upload them all | 326 pass # there are no shortcuts... upload them all |
| 318 else: | 327 else: |
| 319 # Create a mapping of filename to Key for existing files within dest_dir | 328 # Create a mapping of filename to Key for existing files within dest_dir |
| 320 existing_dest_filemap = {} | 329 existing_dest_filemap = {} |
| 321 prefix = dest_dir | 330 prefix = dest_dir |
| 322 if prefix and not prefix.endswith('/'): | 331 if prefix and not prefix.endswith('/'): |
| (...skipping 12 matching lines...) Expand all Loading... | |
| 335 elif upload_if == self.UploadIf.IF_MODIFIED: | 344 elif upload_if == self.UploadIf.IF_MODIFIED: |
| 336 for rel_path in files_in_common: | 345 for rel_path in files_in_common: |
| 337 local_md5 = '"%s"' % _get_local_md5(path=os.path.join( | 346 local_md5 = '"%s"' % _get_local_md5(path=os.path.join( |
| 338 source_dir, rel_path)) | 347 source_dir, rel_path)) |
| 339 key = existing_dest_filemap[rel_path] | 348 key = existing_dest_filemap[rel_path] |
| 340 if local_md5 == key.etag: | 349 if local_md5 == key.etag: |
| 341 source_fileset.remove(rel_path) | 350 source_fileset.remove(rel_path) |
| 342 else: | 351 else: |
| 343 raise Exception('unknown value of upload_if: %s' % upload_if) | 352 raise Exception('unknown value of upload_if: %s' % upload_if) |
| 344 | 353 |
| 354 # Set up a counter of files that have been uploaded. | |
| 355 # Python's Global Interpreter Lock should make this thread-safe; see | |
| 356 # http://www.gossamer-threads.com/lists/python/dev/273403 | |
| 357 num_files_to_upload = len(source_fileset) | |
| 358 atomic_incrementer = iter(xrange(1, num_files_to_upload+1)).next | |
| 359 | |
| 345 # Upload any files still in source_fileset. | 360 # Upload any files still in source_fileset. |
| 346 for rel_path in sorted(source_fileset): | 361 self.logger.info('Uploading %d files, skipping %d ...' % ( |
| 347 self.upload_file( | 362 num_files_to_upload, num_files_total - num_files_to_upload)) |
| 348 source_path=os.path.join(source_dir, rel_path), | 363 if num_files_to_upload == 0: |
| 349 dest_bucket=b, | 364 return |
| 350 dest_path=posixpath.join(dest_dir, rel_path), | 365 if num_threads > num_files_to_upload: |
| 351 upload_if=self.UploadIf.ALWAYS, | 366 num_threads = num_files_to_upload |
|
borenet
2014/07/28 13:32:11
Maybe line 358 should be moved here so that it doe
epoger
2014/07/28 14:14:13
Done. I don't think there is significant performa
| |
| 352 **kwargs) | 367 q = Queue(maxsize=2*num_threads) |
| 368 end_of_queue = object() | |
| 369 def worker(): | |
| 370 while True: | |
| 371 rel_path = q.get() | |
| 372 if rel_path is end_of_queue: | |
| 373 q.task_done() | |
| 374 return | |
|
borenet
2014/07/28 13:32:11
Why not use q.get(False) or q.get_nowait() and try
epoger
2014/07/28 14:14:14
The problem with checking for an empty queue is th
borenet
2014/07/28 14:20:07
I was thinking of #2, since you're dealing with th
| |
| 375 self.logger.info(' Uploading file %d/%d: %s' % ( | |
| 376 atomic_incrementer(), num_files_to_upload, rel_path)) | |
| 377 self.upload_file( | |
| 378 source_path=os.path.join(source_dir, rel_path), | |
| 379 dest_bucket=b, | |
| 380 dest_path=posixpath.join(dest_dir, rel_path), | |
| 381 upload_if=self.UploadIf.ALWAYS, | |
| 382 **kwargs) | |
| 383 q.task_done() | |
| 384 # Spin up the threads. | |
| 385 for _ in range(num_threads): | |
| 386 t = Thread(target=worker) | |
| 387 t.daemon = True | |
| 388 t.start() | |
| 389 # Start loading files into the work queue. | |
| 390 for rel_path in source_fileset: | |
| 391 q.put(rel_path) | |
| 392 # Tell all workers to go home. | |
| 393 for _ in range(num_threads): | |
| 394 q.put(end_of_queue) | |
| 395 # Block until all files have been uploaded and all workers have gone home. | |
| 396 q.join() | |
| 353 | 397 |
| 354 def download_file(self, source_bucket, source_path, dest_path, | 398 def download_file(self, source_bucket, source_path, dest_path, |
| 355 create_subdirs_if_needed=False): | 399 create_subdirs_if_needed=False): |
| 356 """Downloads a single file from Google Cloud Storage to local disk. | 400 """Downloads a single file from Google Cloud Storage to local disk. |
| 357 | 401 |
| 358 Args: | 402 Args: |
| 359 source_bucket: GS bucket to download the file from | 403 source_bucket: GS bucket to download the file from |
| 360 source_path: full path (Posix-style) within that bucket | 404 source_path: full path (Posix-style) within that bucket |
| 361 dest_path: full path (local-OS-style) on local disk to copy the file to | 405 dest_path: full path (local-OS-style) on local disk to copy the file to |
| 362 create_subdirs_if_needed: boolean; whether to create subdirectories as | 406 create_subdirs_if_needed: boolean; whether to create subdirectories as |
| (...skipping 240 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 603 | 647 |
| 604 def _get_local_md5(path): | 648 def _get_local_md5(path): |
| 605 """Returns the MD5 hash of a file on local disk.""" | 649 """Returns the MD5 hash of a file on local disk.""" |
| 606 hasher = hashlib.md5() | 650 hasher = hashlib.md5() |
| 607 with open(path, 'rb') as f: | 651 with open(path, 'rb') as f: |
| 608 while True: | 652 while True: |
| 609 data = f.read(64*1024) | 653 data = f.read(64*1024) |
| 610 if not data: | 654 if not data: |
| 611 return hasher.hexdigest() | 655 return hasher.hexdigest() |
| 612 hasher.update(data) | 656 hasher.update(data) |
| OLD | NEW |