| Index: py/utils/gs_utils.py
|
| diff --git a/py/utils/gs_utils.py b/py/utils/gs_utils.py
|
| index c58bc67a3c8fb189002a7358d0b894d51bdc11f3..7f3b59c058c4e3d44becbd434c6f14a90e31ab92 100644
|
| --- a/py/utils/gs_utils.py
|
| +++ b/py/utils/gs_utils.py
|
| @@ -104,9 +104,9 @@ class GSUtils(object):
|
| class UploadIf:
|
| """Cases in which we will upload a file.
|
|
|
| - Beware of performance tradeoffs. E.g., if the file is small, the extra
|
| - round trip to check for file existence and/or checksum may take longer than
|
| - just uploading the file.
|
| + Beware of performance tradeoffs. E.g., if you are uploading just one small
|
| + file, the extra round trip to check for file existence and/or checksum may
|
| + take longer than just uploading the file.
|
| See http://skbug.com/2778 ('gs_utils: when uploading IF_NEW, batch up
|
| checks for existing files within a single remote directory')
|
| """
|
| @@ -244,7 +244,8 @@ class GSUtils(object):
|
| bucket=b, path=key.name,
|
| id_type=id_type, id_value=id_value, permission=permission)
|
|
|
| - def upload_dir_contents(self, source_dir, dest_bucket, dest_dir, **kwargs):
|
| + def upload_dir_contents(self, source_dir, dest_bucket, dest_dir,
|
| + upload_if=UploadIf.ALWAYS, **kwargs):
|
| """Recursively upload contents of a local directory to Google Storage.
|
|
|
| params:
|
| @@ -253,34 +254,67 @@ class GSUtils(object):
|
| dest_bucket: GS bucket to copy the files into
|
| dest_dir: full path (Posix-style) within that bucket; write the files into
|
| this directory. If None, write into the root directory of the bucket.
|
| + upload_if: one of the UploadIf values, describing in which cases we should
|
| + upload the file
|
| kwargs: any additional keyword arguments "inherited" from upload_file()
|
|
|
| The copy operates as a merge: any files in source_dir will be "overlaid" on
|
| top of the existing content in dest_dir. Existing files with the same names
|
| - may or may not be overwritten, depending on the value of the upload_if kwarg
|
| - inherited from upload_file().
|
| + may or may not be overwritten, depending on the value of upload_if.
|
|
|
| TODO(epoger): Upload multiple files simultaneously to reduce latency.
|
| -
|
| - TODO(epoger): When upload_if==IF_NEW, batch up checks for existing files
|
| - within a single remote directory. See http://skbug.com/2778
|
| """
|
| b = self._connect_to_bucket(bucket=dest_bucket)
|
| - for filename in sorted(os.listdir(source_dir)):
|
| - local_path = os.path.join(source_dir, filename)
|
| - if dest_dir:
|
| - remote_path = posixpath.join(dest_dir, filename)
|
| - else:
|
| - remote_path = filename
|
| -
|
| - if os.path.isdir(local_path):
|
| - self.upload_dir_contents( # recurse
|
| - source_dir=local_path, dest_bucket=b, dest_dir=remote_path,
|
| - **kwargs)
|
| + if not dest_dir:
|
| + dest_dir = ''
|
| +
|
| + # Create a set of all files within source_dir.
|
| + source_fileset = set()
|
| + prefix_length = len(source_dir)+1
|
| + for dirpath, _, filenames in os.walk(source_dir):
|
| + relative_dirpath = dirpath[prefix_length:]
|
| + for filename in filenames:
|
| + source_fileset.add(os.path.join(relative_dirpath, filename))
|
| +
|
| + # If we are only uploading files conditionally, remove any unnecessary
|
| + # files from source_fileset.
|
| + if upload_if == self.UploadIf.ALWAYS:
|
| + pass # there are no shortcuts... upload them all
|
| + else:
|
| + # Create a mapping of filename to Key for existing files within dest_dir
|
| + existing_dest_filemap = {}
|
| + prefix = dest_dir
|
| + if prefix and not prefix.endswith('/'):
|
| + prefix += '/'
|
| + prefix_length = len(prefix)
|
| + items = BucketListResultSet(bucket=b, prefix=prefix)
|
| + for item in items:
|
| + if type(item) is Key:
|
| + existing_dest_filemap[item.name[prefix_length:]] = item
|
| +
|
| + # Now, depending on upload_if, trim files we should skip uploading.
|
| + files_in_common = source_fileset.intersection(
|
| + existing_dest_filemap.keys())
|
| + if upload_if == self.UploadIf.IF_NEW:
|
| + source_fileset -= files_in_common
|
| + elif upload_if == self.UploadIf.IF_MODIFIED:
|
| + for rel_path in files_in_common:
|
| + local_md5 = '"%s"' % _get_local_md5(path=os.path.join(
|
| + source_dir, rel_path))
|
| + key = existing_dest_filemap[rel_path]
|
| + if local_md5 == key.etag:
|
| + source_fileset.remove(rel_path)
|
| else:
|
| - self.upload_file(
|
| - source_path=local_path, dest_bucket=b, dest_path=remote_path,
|
| - **kwargs)
|
| + raise Exception('unknown value of upload_if: %s' % upload_if)
|
| +
|
| + # Upload any files still in source_fileset.
|
| + for rel_path in sorted(source_fileset):
|
| + self.upload_file(
|
| + source_path=os.path.join(source_dir, rel_path),
|
| + dest_bucket=b,
|
| + dest_path=posixpath.join(dest_dir, rel_path),
|
| + upload_if=self.UploadIf.ALWAYS,
|
| + **kwargs)
|
|
|
| def download_file(self, source_bucket, source_path, dest_path,
|
| create_subdirs_if_needed=False):
|
|
|