Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(236)

Side by Side Diff: py/utils/gs_utils.py

Issue 424553002: upload_dir_contents(): upload multiple files in parallel (Closed) Base URL: https://skia.googlesource.com/common.git@master
Patch Set: replace prints with logging Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | py/utils/gs_utils_manualtest.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/python 1 #!/usr/bin/python
2 2
3 # pylint: disable=C0301 3 # pylint: disable=C0301
4 """ 4 """
5 Copyright 2014 Google Inc. 5 Copyright 2014 Google Inc.
6 6
7 Use of this source code is governed by a BSD-style license that can be 7 Use of this source code is governed by a BSD-style license that can be
8 found in the LICENSE file. 8 found in the LICENSE file.
9 9
10 Utilities for accessing Google Cloud Storage, using the boto library (wrapper 10 Utilities for accessing Google Cloud Storage, using the boto library (wrapper
11 for the XML API). 11 for the XML API).
12 12
13 API/library references: 13 API/library references:
14 - https://developers.google.com/storage/docs/reference-guide 14 - https://developers.google.com/storage/docs/reference-guide
15 - http://googlecloudstorage.blogspot.com/2012/09/google-cloud-storage-tutorial-u sing-boto.html 15 - http://googlecloudstorage.blogspot.com/2012/09/google-cloud-storage-tutorial-u sing-boto.html
16 """ 16 """
17 # pylint: enable=C0301 17 # pylint: enable=C0301
18 18
19 # System-level imports 19 # System-level imports
20 import errno 20 import errno
21 import hashlib 21 import hashlib
22 import logging
22 import os 23 import os
23 import posixpath 24 import posixpath
24 import re 25 import re
25 import sys 26 import sys
27 from Queue import Queue
28 from threading import Thread
26 29
27 # Imports from third-party code 30 # Imports from third-party code
28 TRUNK_DIRECTORY = os.path.abspath(os.path.join( 31 TRUNK_DIRECTORY = os.path.abspath(os.path.join(
29 os.path.dirname(__file__), os.pardir, os.pardir)) 32 os.path.dirname(__file__), os.pardir, os.pardir))
30 for import_subdir in ['boto']: 33 for import_subdir in ['boto']:
31 import_dirpath = os.path.join( 34 import_dirpath = os.path.join(
32 TRUNK_DIRECTORY, 'third_party', 'externals', import_subdir) 35 TRUNK_DIRECTORY, 'third_party', 'externals', import_subdir)
33 if import_dirpath not in sys.path: 36 if import_dirpath not in sys.path:
34 # We need to insert at the beginning of the path, to make sure that our 37 # We need to insert at the beginning of the path, to make sure that our
35 # imported versions are favored over others that might be in the path. 38 # imported versions are favored over others that might be in the path.
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after
109 take longer than just uploading the file. 112 take longer than just uploading the file.
110 See http://skbug.com/2778 ('gs_utils: when uploading IF_NEW, batch up 113 See http://skbug.com/2778 ('gs_utils: when uploading IF_NEW, batch up
111 checks for existing files within a single remote directory') 114 checks for existing files within a single remote directory')
112 """ 115 """
113 ALWAYS = 1 # always upload the file 116 ALWAYS = 1 # always upload the file
114 IF_NEW = 2 # if there is an existing file with the same name, 117 IF_NEW = 2 # if there is an existing file with the same name,
115 # leave it alone 118 # leave it alone
116 IF_MODIFIED = 3 # if there is an existing file with the same name and 119 IF_MODIFIED = 3 # if there is an existing file with the same name and
117 # contents, leave it alone 120 # contents, leave it alone
118 121
119 def __init__(self, boto_file_path=None): 122 def __init__(self, boto_file_path=None, logger=None):
120 """Constructor. 123 """Constructor.
121 124
122 Params: 125 Params:
123 boto_file_path: full path (local-OS-style) on local disk where .boto 126 boto_file_path: full path (local-OS-style) on local disk where .boto
124 credentials file can be found. If None, then the GSUtils object 127 credentials file can be found. If None, then the GSUtils object
125 created will be able to access only public files in Google Storage. 128 created will be able to access only public files in Google Storage.
129 logger: a logging.Logger instance to use for logging output; if None,
130 one will be created with default characteristics
126 131
127 Raises an exception if no file is found at boto_file_path, or if the file 132 Raises an exception if no file is found at boto_file_path, or if the file
128 found there is malformed. 133 found there is malformed.
129 """ 134 """
135 self.logger = logger or logging.getLogger(__name__)
130 self._gs_access_key_id = None 136 self._gs_access_key_id = None
131 self._gs_secret_access_key = None 137 self._gs_secret_access_key = None
132 if boto_file_path: 138 if boto_file_path:
133 print 'Reading boto file from %s' % boto_file_path 139 self.logger.info('Reading boto file from %s' % boto_file_path)
134 boto_dict = _config_file_as_dict(filepath=boto_file_path) 140 boto_dict = _config_file_as_dict(filepath=boto_file_path)
135 self._gs_access_key_id = boto_dict['gs_access_key_id'] 141 self._gs_access_key_id = boto_dict['gs_access_key_id']
136 self._gs_secret_access_key = boto_dict['gs_secret_access_key'] 142 self._gs_secret_access_key = boto_dict['gs_secret_access_key']
137 # Which field we get/set in ACL entries, depending on IdType. 143 # Which field we get/set in ACL entries, depending on IdType.
138 self._field_by_id_type = { 144 self._field_by_id_type = {
139 self.IdType.GROUP_BY_DOMAIN: 'domain', 145 self.IdType.GROUP_BY_DOMAIN: 'domain',
140 self.IdType.GROUP_BY_EMAIL: 'email_address', 146 self.IdType.GROUP_BY_EMAIL: 'email_address',
141 self.IdType.GROUP_BY_ID: 'id', 147 self.IdType.GROUP_BY_ID: 'id',
142 self.IdType.USER_BY_EMAIL: 'email_address', 148 self.IdType.USER_BY_EMAIL: 'email_address',
143 self.IdType.USER_BY_ID: 'id', 149 self.IdType.USER_BY_ID: 'id',
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
208 so that HTTP downloads of the file would be unzipped automatically. 214 so that HTTP downloads of the file would be unzipped automatically.
209 See https://developers.google.com/storage/docs/gsutil/addlhelp/ 215 See https://developers.google.com/storage/docs/gsutil/addlhelp/
210 WorkingWithObjectMetadata#content-encoding 216 WorkingWithObjectMetadata#content-encoding
211 """ 217 """
212 b = self._connect_to_bucket(bucket=dest_bucket) 218 b = self._connect_to_bucket(bucket=dest_bucket)
213 local_md5 = None # filled in lazily 219 local_md5 = None # filled in lazily
214 220
215 if upload_if == self.UploadIf.IF_NEW: 221 if upload_if == self.UploadIf.IF_NEW:
216 old_key = b.get_key(key_name=dest_path) 222 old_key = b.get_key(key_name=dest_path)
217 if old_key: 223 if old_key:
218 print 'Skipping upload of existing file gs://%s/%s' % ( 224 self.logger.info('Skipping upload of existing file gs://%s/%s' % (
219 b.name, dest_path) 225 b.name, dest_path))
220 return 226 return
221 elif upload_if == self.UploadIf.IF_MODIFIED: 227 elif upload_if == self.UploadIf.IF_MODIFIED:
222 old_key = b.get_key(key_name=dest_path) 228 old_key = b.get_key(key_name=dest_path)
223 if old_key: 229 if old_key:
224 if not local_md5: 230 if not local_md5:
225 local_md5 = _get_local_md5(path=source_path) 231 local_md5 = _get_local_md5(path=source_path)
226 if ('"%s"' % local_md5) == old_key.etag: 232 if ('"%s"' % local_md5) == old_key.etag:
227 print 'Skipping upload of unmodified file gs://%s/%s : %s' % ( 233 self.logger.info(
228 b.name, dest_path, local_md5) 234 'Skipping upload of unmodified file gs://%s/%s : %s' % (
235 b.name, dest_path, local_md5))
229 return 236 return
230 elif upload_if != self.UploadIf.ALWAYS: 237 elif upload_if != self.UploadIf.ALWAYS:
231 raise Exception('unknown value of upload_if: %s' % upload_if) 238 raise Exception('unknown value of upload_if: %s' % upload_if)
232 239
233 # Upload the file using a temporary name at first, in case the transfer 240 # Upload the file using a temporary name at first, in case the transfer
234 # is interrupted partway through. 241 # is interrupted partway through.
235 if not local_md5: 242 if not local_md5:
236 local_md5 = _get_local_md5(path=source_path) 243 local_md5 = _get_local_md5(path=source_path)
237 initial_key = Key(b) 244 initial_key = Key(b)
238 initial_key.name = dest_path + '-uploading-' + local_md5 245 initial_key.name = dest_path + '-uploading-' + local_md5
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
273 280
274 # Set ACLs on the file. 281 # Set ACLs on the file.
275 # We do this *after* copy_key(), because copy_key's preserve_acl 282 # We do this *after* copy_key(), because copy_key's preserve_acl
276 # functionality would incur a performance hit. 283 # functionality would incur a performance hit.
277 for (id_type, id_value, permission) in fine_grained_acl_list or []: 284 for (id_type, id_value, permission) in fine_grained_acl_list or []:
278 self.set_acl( 285 self.set_acl(
279 bucket=b, path=final_key.name, 286 bucket=b, path=final_key.name,
280 id_type=id_type, id_value=id_value, permission=permission) 287 id_type=id_type, id_value=id_value, permission=permission)
281 288
282 def upload_dir_contents(self, source_dir, dest_bucket, dest_dir, 289 def upload_dir_contents(self, source_dir, dest_bucket, dest_dir,
283 upload_if=UploadIf.ALWAYS, **kwargs): 290 num_threads=10, upload_if=UploadIf.ALWAYS, **kwargs):
284 """Recursively upload contents of a local directory to Google Storage. 291 """Recursively upload contents of a local directory to Google Storage.
285 292
286 params: 293 params:
287 source_dir: full path (local-OS-style) on local disk of directory to copy 294 source_dir: full path (local-OS-style) on local disk of directory to copy
288 contents of 295 contents of
289 dest_bucket: GS bucket to copy the files into 296 dest_bucket: GS bucket to copy the files into
290 dest_dir: full path (Posix-style) within that bucket; write the files into 297 dest_dir: full path (Posix-style) within that bucket; write the files into
291 this directory. If None, write into the root directory of the bucket. 298 this directory. If None, write into the root directory of the bucket.
299 num_threads: how many files to upload at once
292 upload_if: one of the UploadIf values, describing in which cases we should 300 upload_if: one of the UploadIf values, describing in which cases we should
293 upload the file 301 upload the file
294 kwargs: any additional keyword arguments "inherited" from upload_file() 302 kwargs: any additional keyword arguments "inherited" from upload_file()
295 303
296 The copy operates as a merge: any files in source_dir will be "overlaid" on 304 The copy operates as a merge: any files in source_dir will be "overlaid" on
297 top of the existing content in dest_dir. Existing files with the same names 305 top of the existing content in dest_dir. Existing files with the same names
298 may or may not be overwritten, depending on the value of upload_if. 306 may or may not be overwritten, depending on the value of upload_if.
299 307
300 TODO(epoger): Upload multiple files simultaneously to reduce latency. 308 TODO(epoger): Upload multiple files simultaneously to reduce latency.
301 """ 309 """
302 b = self._connect_to_bucket(bucket=dest_bucket) 310 b = self._connect_to_bucket(bucket=dest_bucket)
303 if not dest_dir: 311 if not dest_dir:
304 dest_dir = '' 312 dest_dir = ''
305 313
306 # Create a set of all files within source_dir. 314 # Create a set of all files within source_dir.
307 source_fileset = set() 315 source_fileset = set()
308 prefix_length = len(source_dir)+1 316 prefix_length = len(source_dir)+1
309 for dirpath, _, filenames in os.walk(source_dir): 317 for dirpath, _, filenames in os.walk(source_dir):
310 relative_dirpath = dirpath[prefix_length:] 318 relative_dirpath = dirpath[prefix_length:]
311 for filename in filenames: 319 for filename in filenames:
312 source_fileset.add(os.path.join(relative_dirpath, filename)) 320 source_fileset.add(os.path.join(relative_dirpath, filename))
321 num_files_total = len(source_fileset)
313 322
314 # If we are only uploading files conditionally, remove any unnecessary 323 # If we are only uploading files conditionally, remove any unnecessary
315 # files from source_fileset. 324 # files from source_fileset.
316 if upload_if == self.UploadIf.ALWAYS: 325 if upload_if == self.UploadIf.ALWAYS:
317 pass # there are no shortcuts... upload them all 326 pass # there are no shortcuts... upload them all
318 else: 327 else:
319 # Create a mapping of filename to Key for existing files within dest_dir 328 # Create a mapping of filename to Key for existing files within dest_dir
320 existing_dest_filemap = {} 329 existing_dest_filemap = {}
321 prefix = dest_dir 330 prefix = dest_dir
322 if prefix and not prefix.endswith('/'): 331 if prefix and not prefix.endswith('/'):
(...skipping 12 matching lines...) Expand all
335 elif upload_if == self.UploadIf.IF_MODIFIED: 344 elif upload_if == self.UploadIf.IF_MODIFIED:
336 for rel_path in files_in_common: 345 for rel_path in files_in_common:
337 local_md5 = '"%s"' % _get_local_md5(path=os.path.join( 346 local_md5 = '"%s"' % _get_local_md5(path=os.path.join(
338 source_dir, rel_path)) 347 source_dir, rel_path))
339 key = existing_dest_filemap[rel_path] 348 key = existing_dest_filemap[rel_path]
340 if local_md5 == key.etag: 349 if local_md5 == key.etag:
341 source_fileset.remove(rel_path) 350 source_fileset.remove(rel_path)
342 else: 351 else:
343 raise Exception('unknown value of upload_if: %s' % upload_if) 352 raise Exception('unknown value of upload_if: %s' % upload_if)
344 353
354 # Set up a counter of files that have been uploaded.
355 # Python's Global Interpreter Lock should make this thread-safe; see
356 # http://www.gossamer-threads.com/lists/python/dev/273403
357 num_files_to_upload = len(source_fileset)
358 atomic_incrementer = iter(xrange(1, num_files_to_upload+1)).next
359
345 # Upload any files still in source_fileset. 360 # Upload any files still in source_fileset.
346 for rel_path in sorted(source_fileset): 361 self.logger.info('Uploading %d files, skipping %d ...' % (
347 self.upload_file( 362 num_files_to_upload, num_files_total - num_files_to_upload))
348 source_path=os.path.join(source_dir, rel_path), 363 if num_files_to_upload == 0:
349 dest_bucket=b, 364 return
350 dest_path=posixpath.join(dest_dir, rel_path), 365 if num_threads > num_files_to_upload:
351 upload_if=self.UploadIf.ALWAYS, 366 num_threads = num_files_to_upload
borenet 2014/07/28 13:32:11 Maybe line 358 should be moved here so that it doe
epoger 2014/07/28 14:14:13 Done. I don't think there is significant performa
352 **kwargs) 367 q = Queue(maxsize=2*num_threads)
368 end_of_queue = object()
369 def worker():
370 while True:
371 rel_path = q.get()
372 if rel_path is end_of_queue:
373 q.task_done()
374 return
borenet 2014/07/28 13:32:11 Why not use q.get(False) or q.get_nowait() and try
epoger 2014/07/28 14:14:14 The problem with checking for an empty queue is th
borenet 2014/07/28 14:20:07 I was thinking of #2, since you're dealing with th
375 self.logger.info(' Uploading file %d/%d: %s' % (
376 atomic_incrementer(), num_files_to_upload, rel_path))
377 self.upload_file(
378 source_path=os.path.join(source_dir, rel_path),
379 dest_bucket=b,
380 dest_path=posixpath.join(dest_dir, rel_path),
381 upload_if=self.UploadIf.ALWAYS,
382 **kwargs)
383 q.task_done()
384 # Spin up the threads.
385 for _ in range(num_threads):
386 t = Thread(target=worker)
387 t.daemon = True
388 t.start()
389 # Start loading files into the work queue.
390 for rel_path in source_fileset:
391 q.put(rel_path)
392 # Tell all workers to go home.
393 for _ in range(num_threads):
394 q.put(end_of_queue)
395 # Block until all files have been uploaded and all workers have gone home.
396 q.join()
353 397
354 def download_file(self, source_bucket, source_path, dest_path, 398 def download_file(self, source_bucket, source_path, dest_path,
355 create_subdirs_if_needed=False): 399 create_subdirs_if_needed=False):
356 """Downloads a single file from Google Cloud Storage to local disk. 400 """Downloads a single file from Google Cloud Storage to local disk.
357 401
358 Args: 402 Args:
359 source_bucket: GS bucket to download the file from 403 source_bucket: GS bucket to download the file from
360 source_path: full path (Posix-style) within that bucket 404 source_path: full path (Posix-style) within that bucket
361 dest_path: full path (local-OS-style) on local disk to copy the file to 405 dest_path: full path (local-OS-style) on local disk to copy the file to
362 create_subdirs_if_needed: boolean; whether to create subdirectories as 406 create_subdirs_if_needed: boolean; whether to create subdirectories as
(...skipping 240 matching lines...) Expand 10 before | Expand all | Expand 10 after
603 647
604 def _get_local_md5(path): 648 def _get_local_md5(path):
605 """Returns the MD5 hash of a file on local disk.""" 649 """Returns the MD5 hash of a file on local disk."""
606 hasher = hashlib.md5() 650 hasher = hashlib.md5()
607 with open(path, 'rb') as f: 651 with open(path, 'rb') as f:
608 while True: 652 while True:
609 data = f.read(64*1024) 653 data = f.read(64*1024)
610 if not data: 654 if not data:
611 return hasher.hexdigest() 655 return hasher.hexdigest()
612 hasher.update(data) 656 hasher.update(data)
OLDNEW
« no previous file with comments | « no previous file | py/utils/gs_utils_manualtest.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698