Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(184)

Side by Side Diff: py/utils/gs_utils.py

Issue 424553002: upload_dir_contents(): upload multiple files in parallel (Closed) Base URL: https://skia.googlesource.com/common.git@master
Patch Set: add DEFAULT_UPLOAD_THREADS Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | py/utils/gs_utils_manualtest.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/python 1 #!/usr/bin/python
2 2
3 # pylint: disable=C0301 3 # pylint: disable=C0301
4 """ 4 """
5 Copyright 2014 Google Inc. 5 Copyright 2014 Google Inc.
6 6
7 Use of this source code is governed by a BSD-style license that can be 7 Use of this source code is governed by a BSD-style license that can be
8 found in the LICENSE file. 8 found in the LICENSE file.
9 9
10 Utilities for accessing Google Cloud Storage, using the boto library (wrapper 10 Utilities for accessing Google Cloud Storage, using the boto library (wrapper
11 for the XML API). 11 for the XML API).
12 12
13 API/library references: 13 API/library references:
14 - https://developers.google.com/storage/docs/reference-guide 14 - https://developers.google.com/storage/docs/reference-guide
15 - http://googlecloudstorage.blogspot.com/2012/09/google-cloud-storage-tutorial-u sing-boto.html 15 - http://googlecloudstorage.blogspot.com/2012/09/google-cloud-storage-tutorial-u sing-boto.html
16 """ 16 """
17 # pylint: enable=C0301 17 # pylint: enable=C0301
18 18
19 # System-level imports 19 # System-level imports
20 import errno 20 import errno
21 import hashlib 21 import hashlib
22 import logging
22 import os 23 import os
23 import posixpath 24 import posixpath
25 import Queue
24 import re 26 import re
25 import sys 27 import sys
28 import threading
26 29
27 # Imports from third-party code 30 # Imports from third-party code
28 TRUNK_DIRECTORY = os.path.abspath(os.path.join( 31 TRUNK_DIRECTORY = os.path.abspath(os.path.join(
29 os.path.dirname(__file__), os.pardir, os.pardir)) 32 os.path.dirname(__file__), os.pardir, os.pardir))
30 for import_subdir in ['boto']: 33 for import_subdir in ['boto']:
31 import_dirpath = os.path.join( 34 import_dirpath = os.path.join(
32 TRUNK_DIRECTORY, 'third_party', 'externals', import_subdir) 35 TRUNK_DIRECTORY, 'third_party', 'externals', import_subdir)
33 if import_dirpath not in sys.path: 36 if import_dirpath not in sys.path:
34 # We need to insert at the beginning of the path, to make sure that our 37 # We need to insert at the beginning of the path, to make sure that our
35 # imported versions are favored over others that might be in the path. 38 # imported versions are favored over others that might be in the path.
36 sys.path.insert(0, import_dirpath) 39 sys.path.insert(0, import_dirpath)
37 from boto.exception import BotoServerError 40 from boto.exception import BotoServerError
38 from boto.gs import acl 41 from boto.gs import acl
39 from boto.gs.bucket import Bucket 42 from boto.gs.bucket import Bucket
40 from boto.gs.connection import GSConnection 43 from boto.gs.connection import GSConnection
41 from boto.gs.key import Key 44 from boto.gs.key import Key
42 from boto.s3.bucketlistresultset import BucketListResultSet 45 from boto.s3.bucketlistresultset import BucketListResultSet
43 from boto.s3.connection import SubdomainCallingFormat 46 from boto.s3.connection import SubdomainCallingFormat
44 from boto.s3.prefix import Prefix 47 from boto.s3.prefix import Prefix
45 48
49 # How many files to upload at once, by default.
50 # TODO(epoger): Is there a way to compute this intelligently? To some extent
51 # it is a function of how many cores are on the machine, and how many other
52 # processes it is running; but it's probably more a function of how much time
53 # each core sits idle waiting for network I/O to complete.
54 DEFAULT_UPLOAD_THREADS = 10
55
46 56
47 class AnonymousGSConnection(GSConnection): 57 class AnonymousGSConnection(GSConnection):
48 """GSConnection class that allows anonymous connections. 58 """GSConnection class that allows anonymous connections.
49 59
50 The GSConnection class constructor in 60 The GSConnection class constructor in
51 https://github.com/boto/boto/blob/develop/boto/gs/connection.py doesn't allow 61 https://github.com/boto/boto/blob/develop/boto/gs/connection.py doesn't allow
52 for anonymous connections (connections without credentials), so we have to 62 for anonymous connections (connections without credentials), so we have to
53 override it. 63 override it.
54 """ 64 """
55 def __init__(self): 65 def __init__(self):
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
109 take longer than just uploading the file. 119 take longer than just uploading the file.
110 See http://skbug.com/2778 ('gs_utils: when uploading IF_NEW, batch up 120 See http://skbug.com/2778 ('gs_utils: when uploading IF_NEW, batch up
111 checks for existing files within a single remote directory') 121 checks for existing files within a single remote directory')
112 """ 122 """
113 ALWAYS = 1 # always upload the file 123 ALWAYS = 1 # always upload the file
114 IF_NEW = 2 # if there is an existing file with the same name, 124 IF_NEW = 2 # if there is an existing file with the same name,
115 # leave it alone 125 # leave it alone
116 IF_MODIFIED = 3 # if there is an existing file with the same name and 126 IF_MODIFIED = 3 # if there is an existing file with the same name and
117 # contents, leave it alone 127 # contents, leave it alone
118 128
119 def __init__(self, boto_file_path=None): 129 def __init__(self, boto_file_path=None, logger=None):
120 """Constructor. 130 """Constructor.
121 131
122 Params: 132 Params:
123 boto_file_path: full path (local-OS-style) on local disk where .boto 133 boto_file_path: full path (local-OS-style) on local disk where .boto
124 credentials file can be found. If None, then the GSUtils object 134 credentials file can be found. If None, then the GSUtils object
125 created will be able to access only public files in Google Storage. 135 created will be able to access only public files in Google Storage.
136 logger: a logging.Logger instance to use for logging output; if None,
137 one will be created with default characteristics
126 138
127 Raises an exception if no file is found at boto_file_path, or if the file 139 Raises an exception if no file is found at boto_file_path, or if the file
128 found there is malformed. 140 found there is malformed.
129 """ 141 """
142 self.logger = logger or logging.getLogger(__name__)
130 self._gs_access_key_id = None 143 self._gs_access_key_id = None
131 self._gs_secret_access_key = None 144 self._gs_secret_access_key = None
132 if boto_file_path: 145 if boto_file_path:
133 print 'Reading boto file from %s' % boto_file_path 146 self.logger.info('Reading boto file from %s' % boto_file_path)
134 boto_dict = _config_file_as_dict(filepath=boto_file_path) 147 boto_dict = _config_file_as_dict(filepath=boto_file_path)
135 self._gs_access_key_id = boto_dict['gs_access_key_id'] 148 self._gs_access_key_id = boto_dict['gs_access_key_id']
136 self._gs_secret_access_key = boto_dict['gs_secret_access_key'] 149 self._gs_secret_access_key = boto_dict['gs_secret_access_key']
137 # Which field we get/set in ACL entries, depending on IdType. 150 # Which field we get/set in ACL entries, depending on IdType.
138 self._field_by_id_type = { 151 self._field_by_id_type = {
139 self.IdType.GROUP_BY_DOMAIN: 'domain', 152 self.IdType.GROUP_BY_DOMAIN: 'domain',
140 self.IdType.GROUP_BY_EMAIL: 'email_address', 153 self.IdType.GROUP_BY_EMAIL: 'email_address',
141 self.IdType.GROUP_BY_ID: 'id', 154 self.IdType.GROUP_BY_ID: 'id',
142 self.IdType.USER_BY_EMAIL: 'email_address', 155 self.IdType.USER_BY_EMAIL: 'email_address',
143 self.IdType.USER_BY_ID: 'id', 156 self.IdType.USER_BY_ID: 'id',
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
208 so that HTTP downloads of the file would be unzipped automatically. 221 so that HTTP downloads of the file would be unzipped automatically.
209 See https://developers.google.com/storage/docs/gsutil/addlhelp/ 222 See https://developers.google.com/storage/docs/gsutil/addlhelp/
210 WorkingWithObjectMetadata#content-encoding 223 WorkingWithObjectMetadata#content-encoding
211 """ 224 """
212 b = self._connect_to_bucket(bucket=dest_bucket) 225 b = self._connect_to_bucket(bucket=dest_bucket)
213 local_md5 = None # filled in lazily 226 local_md5 = None # filled in lazily
214 227
215 if upload_if == self.UploadIf.IF_NEW: 228 if upload_if == self.UploadIf.IF_NEW:
216 old_key = b.get_key(key_name=dest_path) 229 old_key = b.get_key(key_name=dest_path)
217 if old_key: 230 if old_key:
218 print 'Skipping upload of existing file gs://%s/%s' % ( 231 self.logger.info('Skipping upload of existing file gs://%s/%s' % (
219 b.name, dest_path) 232 b.name, dest_path))
220 return 233 return
221 elif upload_if == self.UploadIf.IF_MODIFIED: 234 elif upload_if == self.UploadIf.IF_MODIFIED:
222 old_key = b.get_key(key_name=dest_path) 235 old_key = b.get_key(key_name=dest_path)
223 if old_key: 236 if old_key:
224 if not local_md5: 237 if not local_md5:
225 local_md5 = _get_local_md5(path=source_path) 238 local_md5 = _get_local_md5(path=source_path)
226 if ('"%s"' % local_md5) == old_key.etag: 239 if ('"%s"' % local_md5) == old_key.etag:
227 print 'Skipping upload of unmodified file gs://%s/%s : %s' % ( 240 self.logger.info(
228 b.name, dest_path, local_md5) 241 'Skipping upload of unmodified file gs://%s/%s : %s' % (
242 b.name, dest_path, local_md5))
229 return 243 return
230 elif upload_if != self.UploadIf.ALWAYS: 244 elif upload_if != self.UploadIf.ALWAYS:
231 raise Exception('unknown value of upload_if: %s' % upload_if) 245 raise Exception('unknown value of upload_if: %s' % upload_if)
232 246
233 # Upload the file using a temporary name at first, in case the transfer 247 # Upload the file using a temporary name at first, in case the transfer
234 # is interrupted partway through. 248 # is interrupted partway through.
235 if not local_md5: 249 if not local_md5:
236 local_md5 = _get_local_md5(path=source_path) 250 local_md5 = _get_local_md5(path=source_path)
237 initial_key = Key(b) 251 initial_key = Key(b)
238 initial_key.name = dest_path + '-uploading-' + local_md5 252 initial_key.name = dest_path + '-uploading-' + local_md5
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
273 287
274 # Set ACLs on the file. 288 # Set ACLs on the file.
275 # We do this *after* copy_key(), because copy_key's preserve_acl 289 # We do this *after* copy_key(), because copy_key's preserve_acl
276 # functionality would incur a performance hit. 290 # functionality would incur a performance hit.
277 for (id_type, id_value, permission) in fine_grained_acl_list or []: 291 for (id_type, id_value, permission) in fine_grained_acl_list or []:
278 self.set_acl( 292 self.set_acl(
279 bucket=b, path=final_key.name, 293 bucket=b, path=final_key.name,
280 id_type=id_type, id_value=id_value, permission=permission) 294 id_type=id_type, id_value=id_value, permission=permission)
281 295
282 def upload_dir_contents(self, source_dir, dest_bucket, dest_dir, 296 def upload_dir_contents(self, source_dir, dest_bucket, dest_dir,
297 num_threads=DEFAULT_UPLOAD_THREADS,
283 upload_if=UploadIf.ALWAYS, **kwargs): 298 upload_if=UploadIf.ALWAYS, **kwargs):
284 """Recursively upload contents of a local directory to Google Storage. 299 """Recursively upload contents of a local directory to Google Storage.
285 300
286 params: 301 params:
287 source_dir: full path (local-OS-style) on local disk of directory to copy 302 source_dir: full path (local-OS-style) on local disk of directory to copy
288 contents of 303 contents of
289 dest_bucket: GS bucket to copy the files into 304 dest_bucket: GS bucket to copy the files into
290 dest_dir: full path (Posix-style) within that bucket; write the files into 305 dest_dir: full path (Posix-style) within that bucket; write the files into
291 this directory. If None, write into the root directory of the bucket. 306 this directory. If None, write into the root directory of the bucket.
307 num_threads: how many files to upload at once
292 upload_if: one of the UploadIf values, describing in which cases we should 308 upload_if: one of the UploadIf values, describing in which cases we should
293 upload the file 309 upload the file
294 kwargs: any additional keyword arguments "inherited" from upload_file() 310 kwargs: any additional keyword arguments "inherited" from upload_file()
295 311
296 The copy operates as a merge: any files in source_dir will be "overlaid" on 312 The copy operates as a merge: any files in source_dir will be "overlaid" on
297 top of the existing content in dest_dir. Existing files with the same names 313 top of the existing content in dest_dir. Existing files with the same names
298 may or may not be overwritten, depending on the value of upload_if. 314 may or may not be overwritten, depending on the value of upload_if.
299 315
300 TODO(epoger): Upload multiple files simultaneously to reduce latency. 316 TODO(epoger): Upload multiple files simultaneously to reduce latency.
301 """ 317 """
302 b = self._connect_to_bucket(bucket=dest_bucket) 318 b = self._connect_to_bucket(bucket=dest_bucket)
303 if not dest_dir: 319 if not dest_dir:
304 dest_dir = '' 320 dest_dir = ''
305 321
306 # Create a set of all files within source_dir. 322 # Create a set of all files within source_dir.
307 source_fileset = set() 323 source_fileset = set()
308 prefix_length = len(source_dir)+1 324 prefix_length = len(source_dir)+1
309 for dirpath, _, filenames in os.walk(source_dir): 325 for dirpath, _, filenames in os.walk(source_dir):
310 relative_dirpath = dirpath[prefix_length:] 326 relative_dirpath = dirpath[prefix_length:]
311 for filename in filenames: 327 for filename in filenames:
312 source_fileset.add(os.path.join(relative_dirpath, filename)) 328 source_fileset.add(os.path.join(relative_dirpath, filename))
329 num_files_total = len(source_fileset)
313 330
314 # If we are only uploading files conditionally, remove any unnecessary 331 # If we are only uploading files conditionally, remove any unnecessary
315 # files from source_fileset. 332 # files from source_fileset.
316 if upload_if == self.UploadIf.ALWAYS: 333 if upload_if == self.UploadIf.ALWAYS:
317 pass # there are no shortcuts... upload them all 334 pass # there are no shortcuts... upload them all
318 else: 335 else:
319 # Create a mapping of filename to Key for existing files within dest_dir 336 # Create a mapping of filename to Key for existing files within dest_dir
320 existing_dest_filemap = {} 337 existing_dest_filemap = {}
321 prefix = dest_dir 338 prefix = dest_dir
322 if prefix and not prefix.endswith('/'): 339 if prefix and not prefix.endswith('/'):
(...skipping 13 matching lines...) Expand all
336 for rel_path in files_in_common: 353 for rel_path in files_in_common:
337 local_md5 = '"%s"' % _get_local_md5(path=os.path.join( 354 local_md5 = '"%s"' % _get_local_md5(path=os.path.join(
338 source_dir, rel_path)) 355 source_dir, rel_path))
339 key = existing_dest_filemap[rel_path] 356 key = existing_dest_filemap[rel_path]
340 if local_md5 == key.etag: 357 if local_md5 == key.etag:
341 source_fileset.remove(rel_path) 358 source_fileset.remove(rel_path)
342 else: 359 else:
343 raise Exception('unknown value of upload_if: %s' % upload_if) 360 raise Exception('unknown value of upload_if: %s' % upload_if)
344 361
345 # Upload any files still in source_fileset. 362 # Upload any files still in source_fileset.
346 for rel_path in sorted(source_fileset): 363 num_files_to_upload = len(source_fileset)
347 self.upload_file( 364 self.logger.info('Uploading %d files, skipping %d ...' % (
348 source_path=os.path.join(source_dir, rel_path), 365 num_files_to_upload, num_files_total - num_files_to_upload))
349 dest_bucket=b, 366 if num_files_to_upload == 0:
350 dest_path=posixpath.join(dest_dir, rel_path), 367 return
351 upload_if=self.UploadIf.ALWAYS, 368 if num_threads > num_files_to_upload:
352 **kwargs) 369 num_threads = num_files_to_upload
370
371 # Create a work queue with all files that need to be uploaded.
372 q = Queue.Queue(maxsize=num_files_to_upload)
373 for rel_path in source_fileset:
374 q.put(rel_path)
375
376 # Spin up worker threads to read from the task queue.
377 def worker():
378 while True:
379 try:
380 rel_path = q.get(block=False)
381 except Queue.Empty:
382 return # no more tasks in the queue, so exit
383 self.logger.info(' Uploading file %d/%d: %s' % (
384 num_files_to_upload - q.qsize(), num_files_to_upload, rel_path))
385 self.upload_file(
386 source_path=os.path.join(source_dir, rel_path),
387 dest_bucket=b,
388 dest_path=posixpath.join(dest_dir, rel_path),
389 upload_if=self.UploadIf.ALWAYS,
390 **kwargs)
391 q.task_done()
392 for _ in range(num_threads):
393 t = threading.Thread(target=worker)
394 t.daemon = True
395 t.start()
396
397 # Block until all files have been uploaded and all workers have exited.
398 q.join()
353 399
354 def download_file(self, source_bucket, source_path, dest_path, 400 def download_file(self, source_bucket, source_path, dest_path,
355 create_subdirs_if_needed=False): 401 create_subdirs_if_needed=False):
356 """Downloads a single file from Google Cloud Storage to local disk. 402 """Downloads a single file from Google Cloud Storage to local disk.
357 403
358 Args: 404 Args:
359 source_bucket: GS bucket to download the file from 405 source_bucket: GS bucket to download the file from
360 source_path: full path (Posix-style) within that bucket 406 source_path: full path (Posix-style) within that bucket
361 dest_path: full path (local-OS-style) on local disk to copy the file to 407 dest_path: full path (local-OS-style) on local disk to copy the file to
362 create_subdirs_if_needed: boolean; whether to create subdirectories as 408 create_subdirs_if_needed: boolean; whether to create subdirectories as
(...skipping 240 matching lines...) Expand 10 before | Expand all | Expand 10 after
603 649
604 def _get_local_md5(path): 650 def _get_local_md5(path):
605 """Returns the MD5 hash of a file on local disk.""" 651 """Returns the MD5 hash of a file on local disk."""
606 hasher = hashlib.md5() 652 hasher = hashlib.md5()
607 with open(path, 'rb') as f: 653 with open(path, 'rb') as f:
608 while True: 654 while True:
609 data = f.read(64*1024) 655 data = f.read(64*1024)
610 if not data: 656 if not data:
611 return hasher.hexdigest() 657 return hasher.hexdigest()
612 hasher.update(data) 658 hasher.update(data)
OLDNEW
« no previous file with comments | « no previous file | py/utils/gs_utils_manualtest.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698