Index: py/utils/gs_utils.py |
diff --git a/py/utils/gs_utils.py b/py/utils/gs_utils.py |
index c58bc67a3c8fb189002a7358d0b894d51bdc11f3..7f3b59c058c4e3d44becbd434c6f14a90e31ab92 100644 |
--- a/py/utils/gs_utils.py |
+++ b/py/utils/gs_utils.py |
@@ -104,9 +104,9 @@ class GSUtils(object): |
class UploadIf: |
"""Cases in which we will upload a file. |
- Beware of performance tradeoffs. E.g., if the file is small, the extra |
- round trip to check for file existence and/or checksum may take longer than |
- just uploading the file. |
+ Beware of performance tradeoffs. E.g., if you are uploading just one small |
+ file, the extra round trip to check for file existence and/or checksum may |
+ take longer than just uploading the file. |
See http://skbug.com/2778 ('gs_utils: when uploading IF_NEW, batch up |
checks for existing files within a single remote directory') |
""" |
@@ -244,7 +244,8 @@ class GSUtils(object): |
bucket=b, path=key.name, |
id_type=id_type, id_value=id_value, permission=permission) |
- def upload_dir_contents(self, source_dir, dest_bucket, dest_dir, **kwargs): |
+ def upload_dir_contents(self, source_dir, dest_bucket, dest_dir, |
+ upload_if=UploadIf.ALWAYS, **kwargs): |
"""Recursively upload contents of a local directory to Google Storage. |
params: |
@@ -253,34 +254,67 @@ class GSUtils(object): |
dest_bucket: GS bucket to copy the files into |
dest_dir: full path (Posix-style) within that bucket; write the files into |
this directory. If None, write into the root directory of the bucket. |
+ upload_if: one of the UploadIf values, describing in which cases we should |
+ upload the file |
kwargs: any additional keyword arguments "inherited" from upload_file() |
The copy operates as a merge: any files in source_dir will be "overlaid" on |
top of the existing content in dest_dir. Existing files with the same names |
- may or may not be overwritten, depending on the value of the upload_if kwarg |
- inherited from upload_file(). |
+ may or may not be overwritten, depending on the value of upload_if. |
TODO(epoger): Upload multiple files simultaneously to reduce latency. |
- |
- TODO(epoger): When upload_if==IF_NEW, batch up checks for existing files |
- within a single remote directory. See http://skbug.com/2778 |
""" |
b = self._connect_to_bucket(bucket=dest_bucket) |
- for filename in sorted(os.listdir(source_dir)): |
- local_path = os.path.join(source_dir, filename) |
- if dest_dir: |
- remote_path = posixpath.join(dest_dir, filename) |
- else: |
- remote_path = filename |
- |
- if os.path.isdir(local_path): |
- self.upload_dir_contents( # recurse |
- source_dir=local_path, dest_bucket=b, dest_dir=remote_path, |
- **kwargs) |
+ if not dest_dir: |
+ dest_dir = '' |
+ |
+ # Create a set of all files within source_dir. |
+ source_fileset = set() |
+ prefix_length = len(source_dir)+1 |
+ for dirpath, _, filenames in os.walk(source_dir): |
+ relative_dirpath = dirpath[prefix_length:] |
+ for filename in filenames: |
+ source_fileset.add(os.path.join(relative_dirpath, filename)) |
+ |
+ # If we are only uploading files conditionally, remove any unnecessary |
+ # files from source_fileset. |
+ if upload_if == self.UploadIf.ALWAYS: |
+ pass # there are no shortcuts... upload them all |
+ else: |
+ # Create a mapping of filename to Key for existing files within dest_dir |
+ existing_dest_filemap = {} |
+ prefix = dest_dir |
+ if prefix and not prefix.endswith('/'): |
+ prefix += '/' |
+ prefix_length = len(prefix) |
+ items = BucketListResultSet(bucket=b, prefix=prefix) |
+ for item in items: |
+ if type(item) is Key: |
+ existing_dest_filemap[item.name[prefix_length:]] = item |
+ |
+ # Now, depending on upload_if, trim files we should skip uploading. |
+ files_in_common = source_fileset.intersection( |
+ existing_dest_filemap.keys()) |
+ if upload_if == self.UploadIf.IF_NEW: |
+ source_fileset -= files_in_common |
+ elif upload_if == self.UploadIf.IF_MODIFIED: |
+ for rel_path in files_in_common: |
+ local_md5 = '"%s"' % _get_local_md5(path=os.path.join( |
+ source_dir, rel_path)) |
+ key = existing_dest_filemap[rel_path] |
+ if local_md5 == key.etag: |
+ source_fileset.remove(rel_path) |
else: |
- self.upload_file( |
- source_path=local_path, dest_bucket=b, dest_path=remote_path, |
- **kwargs) |
+ raise Exception('unknown value of upload_if: %s' % upload_if) |
+ |
+ # Upload any files still in source_fileset. |
+ for rel_path in sorted(source_fileset): |
+ self.upload_file( |
+ source_path=os.path.join(source_dir, rel_path), |
+ dest_bucket=b, |
+ dest_path=posixpath.join(dest_dir, rel_path), |
+ upload_if=self.UploadIf.ALWAYS, |
+ **kwargs) |
def download_file(self, source_bucket, source_path, dest_path, |
create_subdirs_if_needed=False): |