Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(14)

Side by Side Diff: third_party/gsutil/gslib/command.py

Issue 1380943003: Roll version of gsutil to 4.15. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: rebase Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # -*- coding: utf-8 -*- 1 # -*- coding: utf-8 -*-
2 # Copyright 2010 Google Inc. All Rights Reserved. 2 # Copyright 2010 Google Inc. All Rights Reserved.
3 # 3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License. 5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at 6 # You may obtain a copy of the License at
7 # 7 #
8 # http://www.apache.org/licenses/LICENSE-2.0 8 # http://www.apache.org/licenses/LICENSE-2.0
9 # 9 #
10 # Unless required by applicable law or agreed to in writing, software 10 # Unless required by applicable law or agreed to in writing, software
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
43 from gslib.cloud_api import AccessDeniedException 43 from gslib.cloud_api import AccessDeniedException
44 from gslib.cloud_api import ArgumentException 44 from gslib.cloud_api import ArgumentException
45 from gslib.cloud_api import ServiceException 45 from gslib.cloud_api import ServiceException
46 from gslib.cloud_api_delegator import CloudApiDelegator 46 from gslib.cloud_api_delegator import CloudApiDelegator
47 from gslib.cs_api_map import ApiSelector 47 from gslib.cs_api_map import ApiSelector
48 from gslib.cs_api_map import GsutilApiMapFactory 48 from gslib.cs_api_map import GsutilApiMapFactory
49 from gslib.exception import CommandException 49 from gslib.exception import CommandException
50 from gslib.help_provider import HelpProvider 50 from gslib.help_provider import HelpProvider
51 from gslib.name_expansion import NameExpansionIterator 51 from gslib.name_expansion import NameExpansionIterator
52 from gslib.name_expansion import NameExpansionResult 52 from gslib.name_expansion import NameExpansionResult
53 from gslib.parallelism_framework_util import AtomicIncrementDict 53 from gslib.parallelism_framework_util import AtomicDict
54 from gslib.parallelism_framework_util import BasicIncrementDict
55 from gslib.parallelism_framework_util import ThreadAndProcessSafeDict
56 from gslib.plurality_checkable_iterator import PluralityCheckableIterator 54 from gslib.plurality_checkable_iterator import PluralityCheckableIterator
57 from gslib.sig_handling import RegisterSignalHandler 55 from gslib.sig_handling import RegisterSignalHandler
58 from gslib.storage_url import StorageUrlFromString 56 from gslib.storage_url import StorageUrlFromString
59 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m essages 57 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m essages
60 from gslib.translation_helper import AclTranslation 58 from gslib.translation_helper import AclTranslation
59 from gslib.translation_helper import PRIVATE_DEFAULT_OBJ_ACL
60 from gslib.util import CheckMultiprocessingAvailableAndInit
61 from gslib.util import GetConfigFilePath 61 from gslib.util import GetConfigFilePath
62 from gslib.util import GsutilStreamHandler 62 from gslib.util import GsutilStreamHandler
63 from gslib.util import HaveFileUrls 63 from gslib.util import HaveFileUrls
64 from gslib.util import HaveProviderUrls 64 from gslib.util import HaveProviderUrls
65 from gslib.util import IS_WINDOWS 65 from gslib.util import IS_WINDOWS
66 from gslib.util import MultiprocessingIsAvailable
67 from gslib.util import NO_MAX 66 from gslib.util import NO_MAX
68 from gslib.util import UrlsAreForSingleProvider 67 from gslib.util import UrlsAreForSingleProvider
69 from gslib.util import UTF8 68 from gslib.util import UTF8
70 from gslib.wildcard_iterator import CreateWildcardIterator 69 from gslib.wildcard_iterator import CreateWildcardIterator
71 70
72 OFFER_GSUTIL_M_SUGGESTION_THRESHOLD = 5 71 OFFER_GSUTIL_M_SUGGESTION_THRESHOLD = 5
73 72
74 if IS_WINDOWS: 73 if IS_WINDOWS:
75 import ctypes # pylint: disable=g-import-not-at-top 74 import ctypes # pylint: disable=g-import-not-at-top
76 75
(...skipping 137 matching lines...) Expand 10 before | Expand all | Expand 10 after
214 213
215 # List of all existing task queues - used by all pools to find the queue 214 # List of all existing task queues - used by all pools to find the queue
216 # that's appropriate for the given recursive_apply_level. 215 # that's appropriate for the given recursive_apply_level.
217 task_queues = [] 216 task_queues = []
218 217
219 # Used to assign a globally unique caller ID to each Apply call. 218 # Used to assign a globally unique caller ID to each Apply call.
220 caller_id_lock = manager.Lock() 219 caller_id_lock = manager.Lock()
221 caller_id_counter = multiprocessing.Value('i', 0) 220 caller_id_counter = multiprocessing.Value('i', 0)
222 221
223 # Map from caller_id to total number of tasks to be completed for that ID. 222 # Map from caller_id to total number of tasks to be completed for that ID.
224 total_tasks = ThreadAndProcessSafeDict(manager) 223 total_tasks = AtomicDict(manager=manager)
225 224
226 # Map from caller_id to a boolean which is True iff all its tasks are 225 # Map from caller_id to a boolean which is True iff all its tasks are
227 # finished. 226 # finished.
228 call_completed_map = ThreadAndProcessSafeDict(manager) 227 call_completed_map = AtomicDict(manager=manager)
229 228
230 # Used to keep track of the set of return values for each caller ID. 229 # Used to keep track of the set of return values for each caller ID.
231 global_return_values_map = AtomicIncrementDict(manager) 230 global_return_values_map = AtomicDict(manager=manager)
232 231
233 # Condition used to notify any waiting threads that a task has finished or 232 # Condition used to notify any waiting threads that a task has finished or
234 # that a call to Apply needs a new set of consumer processes. 233 # that a call to Apply needs a new set of consumer processes.
235 need_pool_or_done_cond = manager.Condition() 234 need_pool_or_done_cond = manager.Condition()
236 235
237 # Lock used to prevent multiple worker processes from asking the main thread 236 # Lock used to prevent multiple worker processes from asking the main thread
238 # to create a new consumer pool for the same level. 237 # to create a new consumer pool for the same level.
239 worker_checking_level_lock = manager.Lock() 238 worker_checking_level_lock = manager.Lock()
240 239
241 # Map from caller_id to the current number of completed tasks for that ID. 240 # Map from caller_id to the current number of completed tasks for that ID.
242 caller_id_finished_count = AtomicIncrementDict(manager) 241 caller_id_finished_count = AtomicDict(manager=manager)
243 242
244 # Used as a way for the main thread to distinguish between being woken up 243 # Used as a way for the main thread to distinguish between being woken up
245 # by another call finishing and being woken up by a call that needs a new set 244 # by another call finishing and being woken up by a call that needs a new set
246 # of consumer processes. 245 # of consumer processes.
247 new_pool_needed = multiprocessing.Value('i', 0) 246 new_pool_needed = multiprocessing.Value('i', 0)
248 247
249 current_max_recursive_level = multiprocessing.Value('i', 0) 248 current_max_recursive_level = multiprocessing.Value('i', 0)
250 249
251 # Map from (caller_id, name) to the value of that shared variable. 250 # Map from (caller_id, name) to the value of that shared variable.
252 shared_vars_map = AtomicIncrementDict(manager) 251 shared_vars_map = AtomicDict(manager=manager)
253 shared_vars_list_map = ThreadAndProcessSafeDict(manager) 252 shared_vars_list_map = AtomicDict(manager=manager)
254 253
255 # Map from caller_id to calling class. 254 # Map from caller_id to calling class.
256 class_map = manager.dict() 255 class_map = manager.dict()
257 256
258 # Number of tasks that resulted in an exception in calls to Apply(). 257 # Number of tasks that resulted in an exception in calls to Apply().
259 failure_count = multiprocessing.Value('i', 0) 258 failure_count = multiprocessing.Value('i', 0)
260 259
261 260
261 def InitializeThreadingVariables():
262 """Initializes module-level variables used when running multi-threaded.
263
264 When multiprocessing is not available (or on Windows where only 1 process
265 is used), thread-safe analogs to the multiprocessing global variables
266 must be initialized. This function is the thread-safe analog to
267 InitializeMultiprocessingVariables.
268 """
269 # pylint: disable=global-variable-undefined
270 global global_return_values_map, shared_vars_map, failure_count
271 global caller_id_finished_count, shared_vars_list_map, total_tasks
272 global need_pool_or_done_cond, call_completed_map, class_map
273 global task_queues, caller_id_lock, caller_id_counter
274 caller_id_counter = 0
275 caller_id_finished_count = AtomicDict()
276 caller_id_lock = threading.Lock()
277 call_completed_map = AtomicDict()
278 class_map = AtomicDict()
279 failure_count = 0
280 global_return_values_map = AtomicDict()
281 need_pool_or_done_cond = threading.Condition()
282 shared_vars_list_map = AtomicDict()
283 shared_vars_map = AtomicDict()
284 task_queues = []
285 total_tasks = AtomicDict()
286
287
262 # Each subclass of Command must define a property named 'command_spec' that is 288 # Each subclass of Command must define a property named 'command_spec' that is
263 # an instance of the following class. 289 # an instance of the following class.
264 CommandSpec = namedtuple('CommandSpec', [ 290 CommandSpec = namedtuple('CommandSpec', [
265 # Name of command. 291 # Name of command.
266 'command_name', 292 'command_name',
267 # Usage synopsis. 293 # Usage synopsis.
268 'usage_synopsis', 294 'usage_synopsis',
269 # List of command name aliases. 295 # List of command name aliases.
270 'command_name_aliases', 296 'command_name_aliases',
271 # Min number of args required by this command. 297 # Min number of args required by this command.
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
352 args = new_command_args[1:] + args 378 args = new_command_args[1:] + args
353 self.logger.warn('\n'.join(textwrap.wrap( 379 self.logger.warn('\n'.join(textwrap.wrap(
354 ('You are using a deprecated alias, "%(used_alias)s", for the ' 380 ('You are using a deprecated alias, "%(used_alias)s", for the '
355 '"%(command_name)s" command. This will stop working on 9/9/2014. ' 381 '"%(command_name)s" command. This will stop working on 9/9/2014. '
356 'Please use "%(command_name)s" with the appropriate sub-command in ' 382 'Please use "%(command_name)s" with the appropriate sub-command in '
357 'the future. See "gsutil help %(command_name)s" for details.') % 383 'the future. See "gsutil help %(command_name)s" for details.') %
358 {'used_alias': self.command_alias_used, 384 {'used_alias': self.command_alias_used,
359 'command_name': self.command_name}))) 385 'command_name': self.command_name})))
360 return args 386 return args
361 387
362 def __init__(self, command_runner, args, headers, debug, parallel_operations, 388 def __init__(self, command_runner, args, headers, debug, trace_token,
363 bucket_storage_uri_class, gsutil_api_class_map_factory, 389 parallel_operations, bucket_storage_uri_class,
364 test_method=None, logging_filters=None, 390 gsutil_api_class_map_factory, logging_filters=None,
365 command_alias_used=None): 391 command_alias_used=None):
366 """Instantiates a Command. 392 """Instantiates a Command.
367 393
368 Args: 394 Args:
369 command_runner: CommandRunner (for commands built atop other commands). 395 command_runner: CommandRunner (for commands built atop other commands).
370 args: Command-line args (arg0 = actual arg, not command name ala bash). 396 args: Command-line args (arg0 = actual arg, not command name ala bash).
371 headers: Dictionary containing optional HTTP headers to pass to boto. 397 headers: Dictionary containing optional HTTP headers to pass to boto.
372 debug: Debug level to pass in to boto connection (range 0..3). 398 debug: Debug level to pass in to boto connection (range 0..3).
399 trace_token: Trace token to pass to the API implementation.
373 parallel_operations: Should command operations be executed in parallel? 400 parallel_operations: Should command operations be executed in parallel?
374 bucket_storage_uri_class: Class to instantiate for cloud StorageUris. 401 bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
375 Settable for testing/mocking. 402 Settable for testing/mocking.
376 gsutil_api_class_map_factory: Creates map of cloud storage interfaces. 403 gsutil_api_class_map_factory: Creates map of cloud storage interfaces.
377 Settable for testing/mocking. 404 Settable for testing/mocking.
378 test_method: Optional general purpose method for testing purposes. 405 logging_filters: Optional list of logging. Filters to apply to this
379 Application and semantics of this method will vary by
380 command and test type.
381 logging_filters: Optional list of logging.Filters to apply to this
382 command's logger. 406 command's logger.
383 command_alias_used: The alias that was actually used when running this 407 command_alias_used: The alias that was actually used when running this
384 command (as opposed to the "official" command name, 408 command (as opposed to the "official" command name,
385 which will always correspond to the file name). 409 which will always correspond to the file name).
386 410
387 Implementation note: subclasses shouldn't need to define an __init__ 411 Implementation note: subclasses shouldn't need to define an __init__
388 method, and instead depend on the shared initialization that happens 412 method, and instead depend on the shared initialization that happens
389 here. If you do define an __init__ method in a subclass you'll need to 413 here. If you do define an __init__ method in a subclass you'll need to
390 explicitly call super().__init__(). But you're encouraged not to do this, 414 explicitly call super().__init__(). But you're encouraged not to do this,
391 because it will make changing the __init__ interface more painful. 415 because it will make changing the __init__ interface more painful.
392 """ 416 """
393 # Save class values from constructor params. 417 # Save class values from constructor params.
394 self.command_runner = command_runner 418 self.command_runner = command_runner
395 self.unparsed_args = args 419 self.unparsed_args = args
396 self.headers = headers 420 self.headers = headers
397 self.debug = debug 421 self.debug = debug
422 self.trace_token = trace_token
398 self.parallel_operations = parallel_operations 423 self.parallel_operations = parallel_operations
399 self.bucket_storage_uri_class = bucket_storage_uri_class 424 self.bucket_storage_uri_class = bucket_storage_uri_class
400 self.gsutil_api_class_map_factory = gsutil_api_class_map_factory 425 self.gsutil_api_class_map_factory = gsutil_api_class_map_factory
401 self.test_method = test_method
402 self.exclude_symlinks = False 426 self.exclude_symlinks = False
403 self.recursion_requested = False 427 self.recursion_requested = False
404 self.all_versions = False 428 self.all_versions = False
405 self.command_alias_used = command_alias_used 429 self.command_alias_used = command_alias_used
406 430
407 # Global instance of a threaded logger object. 431 # Global instance of a threaded logger object.
408 self.logger = CreateGsutilLogger(self.command_name) 432 self.logger = CreateGsutilLogger(self.command_name)
409 if logging_filters: 433 if logging_filters:
410 for log_filter in logging_filters: 434 for log_filter in logging_filters:
411 self.logger.addFilter(log_filter) 435 self.logger.addFilter(log_filter)
(...skipping 26 matching lines...) Expand all
438 default_map = { 462 default_map = {
439 'gs': self.command_spec.gs_default_api, 463 'gs': self.command_spec.gs_default_api,
440 's3': ApiSelector.XML 464 's3': ApiSelector.XML
441 } 465 }
442 self.gsutil_api_map = GsutilApiMapFactory.GetApiMap( 466 self.gsutil_api_map = GsutilApiMapFactory.GetApiMap(
443 self.gsutil_api_class_map_factory, support_map, default_map) 467 self.gsutil_api_class_map_factory, support_map, default_map)
444 468
445 self.project_id = None 469 self.project_id = None
446 self.gsutil_api = CloudApiDelegator( 470 self.gsutil_api = CloudApiDelegator(
447 bucket_storage_uri_class, self.gsutil_api_map, 471 bucket_storage_uri_class, self.gsutil_api_map,
448 self.logger, debug=self.debug) 472 self.logger, debug=self.debug, trace_token=self.trace_token)
449 473
450 # Cross-platform path to run gsutil binary. 474 # Cross-platform path to run gsutil binary.
451 self.gsutil_cmd = '' 475 self.gsutil_cmd = ''
452 # If running on Windows, invoke python interpreter explicitly. 476 # If running on Windows, invoke python interpreter explicitly.
453 if gslib.util.IS_WINDOWS: 477 if gslib.util.IS_WINDOWS:
454 self.gsutil_cmd += 'python ' 478 self.gsutil_cmd += 'python '
455 # Add full path to gsutil to make sure we test the correct version. 479 # Add full path to gsutil to make sure we test the correct version.
456 self.gsutil_path = gslib.GSUTIL_PATH 480 self.gsutil_path = gslib.GSUTIL_PATH
457 self.gsutil_cmd += self.gsutil_path 481 self.gsutil_cmd += self.gsutil_path
458 482
459 # We're treating recursion_requested like it's used by all commands, but 483 # We're treating recursion_requested like it's used by all commands, but
460 # only some of the commands accept the -R option. 484 # only some of the commands accept the -R option.
461 if self.sub_opts: 485 if self.sub_opts:
462 for o, unused_a in self.sub_opts: 486 for o, unused_a in self.sub_opts:
463 if o == '-r' or o == '-R': 487 if o == '-r' or o == '-R':
464 self.recursion_requested = True 488 self.recursion_requested = True
465 break 489 break
466 490
467 self.multiprocessing_is_available = MultiprocessingIsAvailable()[0] 491 self.multiprocessing_is_available = (
492 CheckMultiprocessingAvailableAndInit().is_available)
468 493
469 def RaiseWrongNumberOfArgumentsException(self): 494 def RaiseWrongNumberOfArgumentsException(self):
470 """Raises exception for wrong number of arguments supplied to command.""" 495 """Raises exception for wrong number of arguments supplied to command."""
471 if len(self.args) < self.command_spec.min_args: 496 if len(self.args) < self.command_spec.min_args:
472 tail_str = 's' if self.command_spec.min_args > 1 else '' 497 tail_str = 's' if self.command_spec.min_args > 1 else ''
473 message = ('The %s command requires at least %d argument%s.' % 498 message = ('The %s command requires at least %d argument%s.' %
474 (self.command_name, self.command_spec.min_args, tail_str)) 499 (self.command_name, self.command_spec.min_args, tail_str))
475 else: 500 else:
476 message = ('The %s command accepts at most %d arguments.' % 501 message = ('The %s command accepts at most %d arguments.' %
477 (self.command_name, self.command_spec.max_args)) 502 (self.command_name, self.command_spec.max_args))
(...skipping 204 matching lines...) Expand 10 before | Expand all | Expand 10 after
682 try: 707 try:
683 if url.IsBucket(): 708 if url.IsBucket():
684 if self.def_acl: 709 if self.def_acl:
685 if self.canned: 710 if self.canned:
686 gsutil_api.PatchBucket( 711 gsutil_api.PatchBucket(
687 url.bucket_name, apitools_messages.Bucket(), 712 url.bucket_name, apitools_messages.Bucket(),
688 canned_def_acl=self.acl_arg, provider=url.scheme, fields=['id']) 713 canned_def_acl=self.acl_arg, provider=url.scheme, fields=['id'])
689 else: 714 else:
690 def_obj_acl = AclTranslation.JsonToMessage( 715 def_obj_acl = AclTranslation.JsonToMessage(
691 self.acl_arg, apitools_messages.ObjectAccessControl) 716 self.acl_arg, apitools_messages.ObjectAccessControl)
717 if not def_obj_acl:
718 # Use a sentinel value to indicate a private (no entries) default
719 # object ACL.
720 def_obj_acl.append(PRIVATE_DEFAULT_OBJ_ACL)
692 bucket_metadata = apitools_messages.Bucket( 721 bucket_metadata = apitools_messages.Bucket(
693 defaultObjectAcl=def_obj_acl) 722 defaultObjectAcl=def_obj_acl)
694 gsutil_api.PatchBucket(url.bucket_name, bucket_metadata, 723 gsutil_api.PatchBucket(url.bucket_name, bucket_metadata,
695 provider=url.scheme, fields=['id']) 724 provider=url.scheme, fields=['id'])
696 else: 725 else:
697 if self.canned: 726 if self.canned:
698 gsutil_api.PatchBucket( 727 gsutil_api.PatchBucket(
699 url.bucket_name, apitools_messages.Bucket(), 728 url.bucket_name, apitools_messages.Bucket(),
700 canned_acl=self.acl_arg, provider=url.scheme, fields=['id']) 729 canned_acl=self.acl_arg, provider=url.scheme, fields=['id'])
701 else: 730 else:
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
742 self.command_name) 771 self.command_name)
743 772
744 # Determine whether acl_arg names a file containing XML ACL text vs. the 773 # Determine whether acl_arg names a file containing XML ACL text vs. the
745 # string name of a canned ACL. 774 # string name of a canned ACL.
746 if os.path.isfile(acl_arg): 775 if os.path.isfile(acl_arg):
747 with codecs.open(acl_arg, 'r', UTF8) as f: 776 with codecs.open(acl_arg, 'r', UTF8) as f:
748 acl_arg = f.read() 777 acl_arg = f.read()
749 self.canned = False 778 self.canned = False
750 else: 779 else:
751 # No file exists, so expect a canned ACL string. 780 # No file exists, so expect a canned ACL string.
752 # Canned ACLs are not supported in JSON and we need to use the XML API
753 # to set them.
754 # validate=False because we allow wildcard urls. 781 # validate=False because we allow wildcard urls.
755 storage_uri = boto.storage_uri( 782 storage_uri = boto.storage_uri(
756 url_args[0], debug=self.debug, validate=False, 783 url_args[0], debug=self.debug, validate=False,
757 bucket_storage_uri_class=self.bucket_storage_uri_class) 784 bucket_storage_uri_class=self.bucket_storage_uri_class)
758 785
759 canned_acls = storage_uri.canned_acls() 786 canned_acls = storage_uri.canned_acls()
760 if acl_arg not in canned_acls: 787 if acl_arg not in canned_acls:
761 raise CommandException('Invalid canned ACL "%s".' % acl_arg) 788 raise CommandException('Invalid canned ACL "%s".' % acl_arg)
762 self.canned = True 789 self.canned = True
763 790
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after
859 self.WildcardIterator(url_str).IterBuckets( 886 self.WildcardIterator(url_str).IterBuckets(
860 bucket_fields=bucket_fields)) 887 bucket_fields=bucket_fields))
861 if plurality_iter.IsEmpty(): 888 if plurality_iter.IsEmpty():
862 raise CommandException('No URLs matched') 889 raise CommandException('No URLs matched')
863 if plurality_iter.HasPlurality(): 890 if plurality_iter.HasPlurality():
864 raise CommandException( 891 raise CommandException(
865 '%s matched more than one URL, which is not allowed by the %s ' 892 '%s matched more than one URL, which is not allowed by the %s '
866 'command' % (url_str, self.command_name)) 893 'command' % (url_str, self.command_name))
867 return list(plurality_iter)[0] 894 return list(plurality_iter)[0]
868 895
869 def _HandleMultiProcessingSigs(self, unused_signal_num, 896 def _HandleMultiProcessingSigs(self, signal_num, unused_cur_stack_frame):
870 unused_cur_stack_frame):
871 """Handles signals INT AND TERM during a multi-process/multi-thread request. 897 """Handles signals INT AND TERM during a multi-process/multi-thread request.
872 898
873 Kills subprocesses. 899 Kills subprocesses.
874 900
875 Args: 901 Args:
876 unused_signal_num: signal generated by ^C. 902 unused_signal_num: signal generated by ^C.
877 unused_cur_stack_frame: Current stack frame. 903 unused_cur_stack_frame: Current stack frame.
878 """ 904 """
879 # Note: This only works under Linux/MacOS. See 905 # Note: This only works under Linux/MacOS. See
880 # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details 906 # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details
881 # about why making it work correctly across OS's is harder and still open. 907 # about why making it work correctly across OS's is harder and still open.
882 ShutDownGsutil() 908 ShutDownGsutil()
883 sys.stderr.write('Caught ^C - exiting\n') 909 if signal_num == signal.SIGINT:
910 sys.stderr.write('Caught ^C - exiting\n')
884 # Simply calling sys.exit(1) doesn't work - see above bug for details. 911 # Simply calling sys.exit(1) doesn't work - see above bug for details.
885 KillProcess(os.getpid()) 912 KillProcess(os.getpid())
886 913
887 def GetSingleBucketUrlFromArg(self, arg, bucket_fields=None): 914 def GetSingleBucketUrlFromArg(self, arg, bucket_fields=None):
888 """Gets a single bucket URL based on the command arguments. 915 """Gets a single bucket URL based on the command arguments.
889 916
890 Args: 917 Args:
891 arg: String argument to get bucket URL for. 918 arg: String argument to get bucket URL for.
892 bucket_fields: Fields to populate for the bucket. 919 bucket_fields: Fields to populate for the bucket.
893 920
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
995 'update your config file (located at %s) and set ' 1022 'update your config file (located at %s) and set '
996 '"parallel_process_count = 1".') % 1023 '"parallel_process_count = 1".') %
997 GetConfigFilePath()))) 1024 GetConfigFilePath())))
998 self.logger.debug('process count: %d', process_count) 1025 self.logger.debug('process count: %d', process_count)
999 self.logger.debug('thread count: %d', thread_count) 1026 self.logger.debug('thread count: %d', thread_count)
1000 1027
1001 return (process_count, thread_count) 1028 return (process_count, thread_count)
1002 1029
1003 def _SetUpPerCallerState(self): 1030 def _SetUpPerCallerState(self):
1004 """Set up the state for a caller id, corresponding to one Apply call.""" 1031 """Set up the state for a caller id, corresponding to one Apply call."""
1032 # pylint: disable=global-variable-undefined,global-variable-not-assigned
1033 # These variables are initialized in InitializeMultiprocessingVariables or
1034 # InitializeThreadingVariables
1035 global global_return_values_map, shared_vars_map, failure_count
1036 global caller_id_finished_count, shared_vars_list_map, total_tasks
1037 global need_pool_or_done_cond, call_completed_map, class_map
1038 global task_queues, caller_id_lock, caller_id_counter
1005 # Get a new caller ID. 1039 # Get a new caller ID.
1006 with caller_id_lock: 1040 with caller_id_lock:
1007 caller_id_counter.value += 1 1041 if isinstance(caller_id_counter, int):
1008 caller_id = caller_id_counter.value 1042 caller_id_counter += 1
1043 caller_id = caller_id_counter
1044 else:
1045 caller_id_counter.value += 1
1046 caller_id = caller_id_counter.value
1009 1047
1010 # Create a copy of self with an incremented recursive level. This allows 1048 # Create a copy of self with an incremented recursive level. This allows
1011 # the class to report its level correctly if the function called from it 1049 # the class to report its level correctly if the function called from it
1012 # also needs to call Apply. 1050 # also needs to call Apply.
1013 cls = copy.copy(self) 1051 cls = copy.copy(self)
1014 cls.recursive_apply_level += 1 1052 cls.recursive_apply_level += 1
1015 1053
1016 # Thread-safe loggers can't be pickled, so we will remove it here and 1054 # Thread-safe loggers can't be pickled, so we will remove it here and
1017 # recreate it later in the WorkerThread. This is not a problem since any 1055 # recreate it later in the WorkerThread. This is not a problem since any
1018 # logger with the same name will be treated as a singleton. 1056 # logger with the same name will be treated as a singleton.
1019 cls.logger = None 1057 cls.logger = None
1020 1058
1021 # Likewise, the default API connection can't be pickled, but it is unused 1059 # Likewise, the default API connection can't be pickled, but it is unused
1022 # anyway as each thread gets its own API delegator. 1060 # anyway as each thread gets its own API delegator.
1023 cls.gsutil_api = None 1061 cls.gsutil_api = None
1024 1062
1025 class_map[caller_id] = cls 1063 class_map[caller_id] = cls
1026 total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet. 1064 total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet.
1027 call_completed_map[caller_id] = False 1065 call_completed_map[caller_id] = False
1028 caller_id_finished_count.Put(caller_id, 0) 1066 caller_id_finished_count[caller_id] = 0
1029 global_return_values_map.Put(caller_id, []) 1067 global_return_values_map[caller_id] = []
1030 return caller_id 1068 return caller_id
1031 1069
1032 def _CreateNewConsumerPool(self, num_processes, num_threads): 1070 def _CreateNewConsumerPool(self, num_processes, num_threads):
1033 """Create a new pool of processes that call _ApplyThreads.""" 1071 """Create a new pool of processes that call _ApplyThreads."""
1034 processes = [] 1072 processes = []
1035 task_queue = _NewMultiprocessingQueue() 1073 task_queue = _NewMultiprocessingQueue()
1036 task_queues.append(task_queue) 1074 task_queues.append(task_queue)
1037 1075
1038 current_max_recursive_level.value += 1 1076 current_max_recursive_level.value += 1
1039 if current_max_recursive_level.value > MAX_RECURSIVE_DEPTH: 1077 if current_max_recursive_level.value > MAX_RECURSIVE_DEPTH:
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
1096 and self.sequential_caller_id == -1) 1134 and self.sequential_caller_id == -1)
1097 1135
1098 # We don't honor the fail_on_error flag in the case of multiple threads 1136 # We don't honor the fail_on_error flag in the case of multiple threads
1099 # or processes. 1137 # or processes.
1100 fail_on_error = fail_on_error and (process_count * thread_count == 1) 1138 fail_on_error = fail_on_error and (process_count * thread_count == 1)
1101 1139
1102 # Only check this from the first call in the main thread. Apart from the 1140 # Only check this from the first call in the main thread. Apart from the
1103 # fact that it's wasteful to try this multiple times in general, it also 1141 # fact that it's wasteful to try this multiple times in general, it also
1104 # will never work when called from a subprocess since we use daemon 1142 # will never work when called from a subprocess since we use daemon
1105 # processes, and daemons can't create other processes. 1143 # processes, and daemons can't create other processes.
1106 if is_main_thread: 1144 if (is_main_thread and not self.multiprocessing_is_available and
1107 if ((not self.multiprocessing_is_available) 1145 process_count > 1):
1108 and thread_count * process_count > 1): 1146 # Run the check again and log the appropriate warnings. This was run
1109 # Run the check again and log the appropriate warnings. This was run 1147 # before, when the Command object was created, in order to calculate
1110 # before, when the Command object was created, in order to calculate 1148 # self.multiprocessing_is_available, but we don't want to print the
1111 # self.multiprocessing_is_available, but we don't want to print the 1149 # warning until we're sure the user actually tried to use multiple
1112 # warning until we're sure the user actually tried to use multiple 1150 # threads or processes.
1113 # threads or processes. 1151 CheckMultiprocessingAvailableAndInit(logger=self.logger)
1114 MultiprocessingIsAvailable(logger=self.logger)
1115 1152
1116 if self.multiprocessing_is_available: 1153 caller_id = self._SetUpPerCallerState()
1117 caller_id = self._SetUpPerCallerState()
1118 else:
1119 self.sequential_caller_id += 1
1120 caller_id = self.sequential_caller_id
1121
1122 if is_main_thread:
1123 # pylint: disable=global-variable-undefined
1124 global global_return_values_map, shared_vars_map, failure_count
1125 global caller_id_finished_count, shared_vars_list_map
1126 global_return_values_map = BasicIncrementDict()
1127 global_return_values_map.Put(caller_id, [])
1128 shared_vars_map = BasicIncrementDict()
1129 caller_id_finished_count = BasicIncrementDict()
1130 shared_vars_list_map = {}
1131 failure_count = 0
1132 1154
1133 # If any shared attributes passed by caller, create a dictionary of 1155 # If any shared attributes passed by caller, create a dictionary of
1134 # shared memory variables for every element in the list of shared 1156 # shared memory variables for every element in the list of shared
1135 # attributes. 1157 # attributes.
1136 if shared_attrs: 1158 if shared_attrs:
1137 shared_vars_list_map[caller_id] = shared_attrs 1159 shared_vars_list_map[caller_id] = shared_attrs
1138 for name in shared_attrs: 1160 for name in shared_attrs:
1139 shared_vars_map.Put((caller_id, name), 0) 1161 shared_vars_map[(caller_id, name)] = 0
1140 1162
1141 # Make all of the requested function calls. 1163 # Make all of the requested function calls.
1142 if self.multiprocessing_is_available and thread_count * process_count > 1: 1164 usable_processes_count = (process_count if self.multiprocessing_is_available
1165 else 1)
1166 if thread_count * usable_processes_count > 1:
1143 self._ParallelApply(func, args_iterator, exception_handler, caller_id, 1167 self._ParallelApply(func, args_iterator, exception_handler, caller_id,
1144 arg_checker, process_count, thread_count, 1168 arg_checker, usable_processes_count, thread_count,
1145 should_return_results, fail_on_error) 1169 should_return_results, fail_on_error)
1146 else: 1170 else:
1147 self._SequentialApply(func, args_iterator, exception_handler, caller_id, 1171 self._SequentialApply(func, args_iterator, exception_handler, caller_id,
1148 arg_checker, should_return_results, fail_on_error) 1172 arg_checker, should_return_results, fail_on_error)
1149 1173
1150 if shared_attrs: 1174 if shared_attrs:
1151 for name in shared_attrs: 1175 for name in shared_attrs:
1152 # This allows us to retain the original value of the shared variable, 1176 # This allows us to retain the original value of the shared variable,
1153 # and simply apply the delta after what was done during the call to 1177 # and simply apply the delta after what was done during the call to
1154 # apply. 1178 # apply.
1155 final_value = (original_shared_vars_values[name] + 1179 final_value = (original_shared_vars_values[name] +
1156 shared_vars_map.Get((caller_id, name))) 1180 shared_vars_map.get((caller_id, name)))
1157 setattr(self, name, final_value) 1181 setattr(self, name, final_value)
1158 1182
1159 if should_return_results: 1183 if should_return_results:
1160 return global_return_values_map.Get(caller_id) 1184 return global_return_values_map.get(caller_id)
1161 1185
1162 def _MaybeSuggestGsutilDashM(self): 1186 def _MaybeSuggestGsutilDashM(self):
1163 """Outputs a sugestion to the user to use gsutil -m.""" 1187 """Outputs a sugestion to the user to use gsutil -m."""
1164 if not (boto.config.getint('GSUtil', 'parallel_process_count', 0) == 1 and 1188 if not (boto.config.getint('GSUtil', 'parallel_process_count', 0) == 1 and
1165 boto.config.getint('GSUtil', 'parallel_thread_count', 0) == 1): 1189 boto.config.getint('GSUtil', 'parallel_thread_count', 0) == 1):
1166 self.logger.info('\n' + textwrap.fill( 1190 self.logger.info('\n' + textwrap.fill(
1167 '==> NOTE: You are performing a sequence of gsutil operations that ' 1191 '==> NOTE: You are performing a sequence of gsutil operations that '
1168 'may run significantly faster if you instead use gsutil -m %s ...\n' 1192 'may run significantly faster if you instead use gsutil -m %s ...\n'
1169 'Please see the -m section under "gsutil help options" for further ' 1193 'Please see the -m section under "gsutil help options" for further '
1170 'information about when gsutil -m can be advantageous.' 1194 'information about when gsutil -m can be advantageous.'
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
1215 if arg_checker(self, args): 1239 if arg_checker(self, args):
1216 # Now that we actually have the next argument, perform the task. 1240 # Now that we actually have the next argument, perform the task.
1217 task = Task(func, args, caller_id, exception_handler, 1241 task = Task(func, args, caller_id, exception_handler,
1218 should_return_results, arg_checker, fail_on_error) 1242 should_return_results, arg_checker, fail_on_error)
1219 worker_thread.PerformTask(task, self) 1243 worker_thread.PerformTask(task, self)
1220 if sequential_call_count >= gslib.util.GetTermLines(): 1244 if sequential_call_count >= gslib.util.GetTermLines():
1221 # Output suggestion at end of long run, in case user missed it at the 1245 # Output suggestion at end of long run, in case user missed it at the
1222 # start and it scrolled off-screen. 1246 # start and it scrolled off-screen.
1223 self._MaybeSuggestGsutilDashM() 1247 self._MaybeSuggestGsutilDashM()
1224 1248
1249 # If the final iterated argument results in an exception, and that
1250 # exception modifies shared_attrs, we need to publish the results.
1251 worker_thread.shared_vars_updater.Update(caller_id, self)
1252
1225 # pylint: disable=g-doc-args 1253 # pylint: disable=g-doc-args
1226 def _ParallelApply(self, func, args_iterator, exception_handler, caller_id, 1254 def _ParallelApply(self, func, args_iterator, exception_handler, caller_id,
1227 arg_checker, process_count, thread_count, 1255 arg_checker, process_count, thread_count,
1228 should_return_results, fail_on_error): 1256 should_return_results, fail_on_error):
1229 """Dispatches input arguments across a thread/process pool. 1257 """Dispatches input arguments across a thread/process pool.
1230 1258
1231 Pools are composed of parallel OS processes and/or Python threads, 1259 Pools are composed of parallel OS processes and/or Python threads,
1232 based on options (-m or not) and settings in the user's config file. 1260 based on options (-m or not) and settings in the user's config file.
1233 1261
1234 If only one OS process is requested/available, dispatch requests across 1262 If only one OS process is requested/available, dispatch requests across
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
1276 RegisterSignalHandler(signal.SIGINT, self._HandleMultiProcessingSigs, 1304 RegisterSignalHandler(signal.SIGINT, self._HandleMultiProcessingSigs,
1277 is_final_handler=True) 1305 is_final_handler=True)
1278 RegisterSignalHandler(signal.SIGTERM, self._HandleMultiProcessingSigs, 1306 RegisterSignalHandler(signal.SIGTERM, self._HandleMultiProcessingSigs,
1279 is_final_handler=True) 1307 is_final_handler=True)
1280 1308
1281 if not task_queues: 1309 if not task_queues:
1282 # The process we create will need to access the next recursive level 1310 # The process we create will need to access the next recursive level
1283 # of task queues if it makes a call to Apply, so we always keep around 1311 # of task queues if it makes a call to Apply, so we always keep around
1284 # one more queue than we know we need. OTOH, if we don't create a new 1312 # one more queue than we know we need. OTOH, if we don't create a new
1285 # process, the existing process still needs a task queue to use. 1313 # process, the existing process still needs a task queue to use.
1286 task_queues.append(_NewMultiprocessingQueue()) 1314 if process_count > 1:
1315 task_queues.append(_NewMultiprocessingQueue())
1316 else:
1317 task_queues.append(_NewThreadsafeQueue())
1287 1318
1288 if process_count > 1: # Handle process pool creation. 1319 if process_count > 1: # Handle process pool creation.
1289 # Check whether this call will need a new set of workers. 1320 # Check whether this call will need a new set of workers.
1290 1321
1291 # Each worker must acquire a shared lock before notifying the main thread 1322 # Each worker must acquire a shared lock before notifying the main thread
1292 # that it needs a new worker pool, so that at most one worker asks for 1323 # that it needs a new worker pool, so that at most one worker asks for
1293 # a new worker pool at once. 1324 # a new worker pool at once.
1294 try: 1325 try:
1295 if not is_main_thread: 1326 if not is_main_thread:
1296 worker_checking_level_lock.acquire() 1327 worker_checking_level_lock.acquire()
(...skipping 11 matching lines...) Expand all
1308 need_pool_or_done_cond.wait() 1339 need_pool_or_done_cond.wait()
1309 finally: 1340 finally:
1310 if not is_main_thread: 1341 if not is_main_thread:
1311 worker_checking_level_lock.release() 1342 worker_checking_level_lock.release()
1312 1343
1313 # If we're running in this process, create a separate task queue. Otherwise, 1344 # If we're running in this process, create a separate task queue. Otherwise,
1314 # if Apply has already been called with process_count > 1, then there will 1345 # if Apply has already been called with process_count > 1, then there will
1315 # be consumer pools trying to use our processes. 1346 # be consumer pools trying to use our processes.
1316 if process_count > 1: 1347 if process_count > 1:
1317 task_queue = task_queues[self.recursive_apply_level] 1348 task_queue = task_queues[self.recursive_apply_level]
1349 elif self.multiprocessing_is_available:
1350 task_queue = _NewMultiprocessingQueue()
1318 else: 1351 else:
1319 task_queue = _NewMultiprocessingQueue() 1352 task_queue = _NewThreadsafeQueue()
1320 1353
1321 # Kick off a producer thread to throw tasks in the global task queue. We 1354 # Kick off a producer thread to throw tasks in the global task queue. We
1322 # do this asynchronously so that the main thread can be free to create new 1355 # do this asynchronously so that the main thread can be free to create new
1323 # consumer pools when needed (otherwise, any thread with a task that needs 1356 # consumer pools when needed (otherwise, any thread with a task that needs
1324 # a new consumer pool must block until we're completely done producing; in 1357 # a new consumer pool must block until we're completely done producing; in
1325 # the worst case, every worker blocks on such a call and the producer fills 1358 # the worst case, every worker blocks on such a call and the producer fills
1326 # up the task queue before it finishes, so we block forever). 1359 # up the task queue before it finishes, so we block forever).
1327 producer_thread = ProducerThread(copy.copy(self), args_iterator, caller_id, 1360 producer_thread = ProducerThread(copy.copy(self), args_iterator, caller_id,
1328 func, task_queue, should_return_results, 1361 func, task_queue, should_return_results,
1329 exception_handler, arg_checker, 1362 exception_handler, arg_checker,
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
1387 of this thread. 1420 of this thread.
1388 is_blocking_call: True iff the call to Apply is blocked on this call 1421 is_blocking_call: True iff the call to Apply is blocked on this call
1389 (which is true iff process_count == 1), implying that 1422 (which is true iff process_count == 1), implying that
1390 _ApplyThreads must behave as a blocking call. 1423 _ApplyThreads must behave as a blocking call.
1391 """ 1424 """
1392 self._ResetConnectionPool() 1425 self._ResetConnectionPool()
1393 self.recursive_apply_level = recursive_apply_level 1426 self.recursive_apply_level = recursive_apply_level
1394 1427
1395 task_queue = task_queue or task_queues[recursive_apply_level] 1428 task_queue = task_queue or task_queues[recursive_apply_level]
1396 1429
1430 # Ensure fairness across processes by filling our WorkerPool only with
1431 # as many tasks as it has WorkerThreads. This semaphore is acquired each
1432 # time that a task is retrieved from the queue and released each time
1433 # a task is completed by a WorkerThread.
1434 worker_semaphore = threading.BoundedSemaphore(thread_count)
1435
1397 assert thread_count * process_count > 1, ( 1436 assert thread_count * process_count > 1, (
1398 'Invalid state, calling command._ApplyThreads with only one thread ' 1437 'Invalid state, calling command._ApplyThreads with only one thread '
1399 'and process.') 1438 'and process.')
1439 # TODO: Presently, this pool gets recreated with each call to Apply. We
1440 # should be able to do it just once, at process creation time.
1400 worker_pool = WorkerPool( 1441 worker_pool = WorkerPool(
1401 thread_count, self.logger, 1442 thread_count, self.logger, worker_semaphore,
1402 bucket_storage_uri_class=self.bucket_storage_uri_class, 1443 bucket_storage_uri_class=self.bucket_storage_uri_class,
1403 gsutil_api_map=self.gsutil_api_map, debug=self.debug) 1444 gsutil_api_map=self.gsutil_api_map, debug=self.debug)
1404 1445
1405 num_enqueued = 0 1446 num_enqueued = 0
1406 while True: 1447 while True:
1448 worker_semaphore.acquire()
1407 task = task_queue.get() 1449 task = task_queue.get()
1408 if task.args != ZERO_TASKS_TO_DO_ARGUMENT: 1450 if task.args != ZERO_TASKS_TO_DO_ARGUMENT:
1409 # If we have no tasks to do and we're performing a blocking call, we 1451 # If we have no tasks to do and we're performing a blocking call, we
1410 # need a special signal to tell us to stop - otherwise, we block on 1452 # need a special signal to tell us to stop - otherwise, we block on
1411 # the call to task_queue.get() forever. 1453 # the call to task_queue.get() forever.
1412 worker_pool.AddTask(task) 1454 worker_pool.AddTask(task)
1413 num_enqueued += 1 1455 num_enqueued += 1
1456 else:
1457 # No tasks remain; don't block the semaphore on WorkerThread completion.
1458 worker_semaphore.release()
1414 1459
1415 if is_blocking_call: 1460 if is_blocking_call:
1416 num_to_do = total_tasks[task.caller_id] 1461 num_to_do = total_tasks[task.caller_id]
1417 # The producer thread won't enqueue the last task until after it has 1462 # The producer thread won't enqueue the last task until after it has
1418 # updated total_tasks[caller_id], so we know that num_to_do < 0 implies 1463 # updated total_tasks[caller_id], so we know that num_to_do < 0 implies
1419 # we will do this check again. 1464 # we will do this check again.
1420 if num_to_do >= 0 and num_enqueued == num_to_do: 1465 if num_to_do >= 0 and num_enqueued == num_to_do:
1421 if thread_count == 1: 1466 if thread_count == 1:
1422 return 1467 return
1423 else: 1468 else:
(...skipping 157 matching lines...) Expand 10 before | Expand all | Expand 10 after
1581 total_tasks[self.caller_id] = num_tasks 1626 total_tasks[self.caller_id] = num_tasks
1582 if not cur_task: 1627 if not cur_task:
1583 # This happens if there were zero arguments to be put in the queue. 1628 # This happens if there were zero arguments to be put in the queue.
1584 cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id, 1629 cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id,
1585 None, None, None, None) 1630 None, None, None, None)
1586 self.task_queue.put(cur_task) 1631 self.task_queue.put(cur_task)
1587 1632
1588 # It's possible that the workers finished before we updated total_tasks, 1633 # It's possible that the workers finished before we updated total_tasks,
1589 # so we need to check here as well. 1634 # so we need to check here as well.
1590 _NotifyIfDone(self.caller_id, 1635 _NotifyIfDone(self.caller_id,
1591 caller_id_finished_count.Get(self.caller_id)) 1636 caller_id_finished_count.get(self.caller_id))
1592 1637
1593 1638
1594 class WorkerPool(object): 1639 class WorkerPool(object):
1595 """Pool of worker threads to which tasks can be added.""" 1640 """Pool of worker threads to which tasks can be added."""
1596 1641
1597 def __init__(self, thread_count, logger, bucket_storage_uri_class=None, 1642 def __init__(self, thread_count, logger, worker_semaphore,
1598 gsutil_api_map=None, debug=0): 1643 bucket_storage_uri_class=None, gsutil_api_map=None, debug=0):
1599 self.task_queue = _NewThreadsafeQueue() 1644 self.task_queue = _NewThreadsafeQueue()
1600 self.threads = [] 1645 self.threads = []
1601 for _ in range(thread_count): 1646 for _ in range(thread_count):
1602 worker_thread = WorkerThread( 1647 worker_thread = WorkerThread(
1603 self.task_queue, logger, 1648 self.task_queue, logger, worker_semaphore=worker_semaphore,
1604 bucket_storage_uri_class=bucket_storage_uri_class, 1649 bucket_storage_uri_class=bucket_storage_uri_class,
1605 gsutil_api_map=gsutil_api_map, debug=debug) 1650 gsutil_api_map=gsutil_api_map, debug=debug)
1606 self.threads.append(worker_thread) 1651 self.threads.append(worker_thread)
1607 worker_thread.start() 1652 worker_thread.start()
1608 1653
1609 def AddTask(self, task): 1654 def AddTask(self, task):
1610 self.task_queue.put(task) 1655 self.task_queue.put(task)
1611 1656
1612 1657
1613 class WorkerThread(threading.Thread): 1658 class WorkerThread(threading.Thread):
1614 """Thread where all the work will be performed. 1659 """Thread where all the work will be performed.
1615 1660
1616 This makes the function calls for Apply and takes care of all error handling, 1661 This makes the function calls for Apply and takes care of all error handling,
1617 return value propagation, and shared_vars. 1662 return value propagation, and shared_vars.
1618 1663
1619 Note that this thread is NOT started upon instantiation because the function- 1664 Note that this thread is NOT started upon instantiation because the function-
1620 calling logic is also used in the single-threaded case. 1665 calling logic is also used in the single-threaded case.
1621 """ 1666 """
1622 1667
1623 def __init__(self, task_queue, logger, bucket_storage_uri_class=None, 1668 def __init__(self, task_queue, logger, worker_semaphore=None,
1624 gsutil_api_map=None, debug=0): 1669 bucket_storage_uri_class=None, gsutil_api_map=None, debug=0):
1625 """Initializes the worker thread. 1670 """Initializes the worker thread.
1626 1671
1627 Args: 1672 Args:
1628 task_queue: The thread-safe queue from which this thread should obtain 1673 task_queue: The thread-safe queue from which this thread should obtain
1629 its work. 1674 its work.
1630 logger: Logger to use for this thread. 1675 logger: Logger to use for this thread.
1676 worker_semaphore: threading.BoundedSemaphore to be released each time a
1677 task is completed, or None for single-threaded execution.
1631 bucket_storage_uri_class: Class to instantiate for cloud StorageUris. 1678 bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
1632 Settable for testing/mocking. 1679 Settable for testing/mocking.
1633 gsutil_api_map: Map of providers and API selector tuples to api classes 1680 gsutil_api_map: Map of providers and API selector tuples to api classes
1634 which can be used to communicate with those providers. 1681 which can be used to communicate with those providers.
1635 Used for the instantiating CloudApiDelegator class. 1682 Used for the instantiating CloudApiDelegator class.
1636 debug: debug level for the CloudApiDelegator class. 1683 debug: debug level for the CloudApiDelegator class.
1637 """ 1684 """
1638 super(WorkerThread, self).__init__() 1685 super(WorkerThread, self).__init__()
1639 self.task_queue = task_queue 1686 self.task_queue = task_queue
1687 self.worker_semaphore = worker_semaphore
1640 self.daemon = True 1688 self.daemon = True
1641 self.cached_classes = {} 1689 self.cached_classes = {}
1642 self.shared_vars_updater = _SharedVariablesUpdater() 1690 self.shared_vars_updater = _SharedVariablesUpdater()
1643 1691
1644 self.thread_gsutil_api = None 1692 self.thread_gsutil_api = None
1645 if bucket_storage_uri_class and gsutil_api_map: 1693 if bucket_storage_uri_class and gsutil_api_map:
1646 self.thread_gsutil_api = CloudApiDelegator( 1694 self.thread_gsutil_api = CloudApiDelegator(
1647 bucket_storage_uri_class, gsutil_api_map, logger, debug=debug) 1695 bucket_storage_uri_class, gsutil_api_map, logger, debug=debug)
1648 1696
1649 def PerformTask(self, task, cls): 1697 def PerformTask(self, task, cls):
1650 """Makes the function call for a task. 1698 """Makes the function call for a task.
1651 1699
1652 Args: 1700 Args:
1653 task: The Task to perform. 1701 task: The Task to perform.
1654 cls: The instance of a class which gives context to the functions called 1702 cls: The instance of a class which gives context to the functions called
1655 by the Task's function. E.g., see SetAclFuncWrapper. 1703 by the Task's function. E.g., see SetAclFuncWrapper.
1656 """ 1704 """
1657 caller_id = task.caller_id 1705 caller_id = task.caller_id
1658 try: 1706 try:
1659 results = task.func(cls, task.args, thread_state=self.thread_gsutil_api) 1707 results = task.func(cls, task.args, thread_state=self.thread_gsutil_api)
1660 if task.should_return_results: 1708 if task.should_return_results:
1661 global_return_values_map.Update(caller_id, [results], default_value=[]) 1709 global_return_values_map.Increment(caller_id, [results],
1710 default_value=[])
1662 except Exception, e: # pylint: disable=broad-except 1711 except Exception, e: # pylint: disable=broad-except
1663 _IncrementFailureCount() 1712 _IncrementFailureCount()
1664 if task.fail_on_error: 1713 if task.fail_on_error:
1665 raise # Only happens for single thread and process case. 1714 raise # Only happens for single thread and process case.
1666 else: 1715 else:
1667 try: 1716 try:
1668 task.exception_handler(cls, e) 1717 task.exception_handler(cls, e)
1669 except Exception, _: # pylint: disable=broad-except 1718 except Exception, _: # pylint: disable=broad-except
1670 # Don't allow callers to raise exceptions here and kill the worker 1719 # Don't allow callers to raise exceptions here and kill the worker
1671 # threads. 1720 # threads.
1672 cls.logger.debug( 1721 cls.logger.debug(
1673 'Caught exception while handling exception for %s:\n%s', 1722 'Caught exception while handling exception for %s:\n%s',
1674 task, traceback.format_exc()) 1723 task, traceback.format_exc())
1675 finally: 1724 finally:
1725 if self.worker_semaphore:
1726 self.worker_semaphore.release()
1676 self.shared_vars_updater.Update(caller_id, cls) 1727 self.shared_vars_updater.Update(caller_id, cls)
1677 1728
1678 # Even if we encounter an exception, we still need to claim that that 1729 # Even if we encounter an exception, we still need to claim that that
1679 # the function finished executing. Otherwise, we won't know when to 1730 # the function finished executing. Otherwise, we won't know when to
1680 # stop waiting and return results. 1731 # stop waiting and return results.
1681 num_done = caller_id_finished_count.Update(caller_id, 1) 1732 num_done = caller_id_finished_count.Increment(caller_id, 1)
1682 1733 _NotifyIfDone(caller_id, num_done)
1683 if cls.multiprocessing_is_available:
1684 _NotifyIfDone(caller_id, num_done)
1685 1734
1686 def run(self): 1735 def run(self):
1687 while True: 1736 while True:
1688 task = self.task_queue.get() 1737 task = self.task_queue.get()
1689 caller_id = task.caller_id 1738 caller_id = task.caller_id
1690 1739
1691 # Get the instance of the command with the appropriate context. 1740 # Get the instance of the command with the appropriate context.
1692 cls = self.cached_classes.get(caller_id, None) 1741 cls = self.cached_classes.get(caller_id, None)
1693 if not cls: 1742 if not cls:
1694 cls = copy.copy(class_map[caller_id]) 1743 cls = copy.copy(class_map[caller_id])
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
1734 key = (caller_id, name) 1783 key = (caller_id, name)
1735 last_value = self.last_shared_var_values.get(key, 0) 1784 last_value = self.last_shared_var_values.get(key, 0)
1736 # Compute the change made since the last time we updated here. This is 1785 # Compute the change made since the last time we updated here. This is
1737 # calculated by simply subtracting the last known value from the current 1786 # calculated by simply subtracting the last known value from the current
1738 # value in the class instance. 1787 # value in the class instance.
1739 delta = getattr(cls, name) - last_value 1788 delta = getattr(cls, name) - last_value
1740 self.last_shared_var_values[key] = delta + last_value 1789 self.last_shared_var_values[key] = delta + last_value
1741 1790
1742 # Update the globally-consistent value by simply increasing it by the 1791 # Update the globally-consistent value by simply increasing it by the
1743 # computed delta. 1792 # computed delta.
1744 shared_vars_map.Update(key, delta) 1793 shared_vars_map.Increment(key, delta)
1745 1794
1746 1795
1747 def _NotifyIfDone(caller_id, num_done): 1796 def _NotifyIfDone(caller_id, num_done):
1748 """Notify any threads waiting for results that something has finished. 1797 """Notify any threads waiting for results that something has finished.
1749 1798
1750 Each waiting thread will then need to check the call_completed_map to see if 1799 Each waiting thread will then need to check the call_completed_map to see if
1751 its work is done. 1800 its work is done.
1752 1801
1753 Note that num_done could be calculated here, but it is passed in as an 1802 Note that num_done could be calculated here, but it is passed in as an
1754 optimization so that we have one less call to a globally-locked data 1803 optimization so that we have one less call to a globally-locked data
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
1801 def ResetFailureCount(): 1850 def ResetFailureCount():
1802 """Resets the failure_count variable to 0 - useful if error is expected.""" 1851 """Resets the failure_count variable to 0 - useful if error is expected."""
1803 try: 1852 try:
1804 global failure_count 1853 global failure_count
1805 if isinstance(failure_count, int): 1854 if isinstance(failure_count, int):
1806 failure_count = 0 1855 failure_count = 0
1807 else: # It's a multiprocessing.Value() of type 'i'. 1856 else: # It's a multiprocessing.Value() of type 'i'.
1808 failure_count = multiprocessing.Value('i', 0) 1857 failure_count = multiprocessing.Value('i', 0)
1809 except NameError: # If it wasn't initialized, Apply() wasn't called. 1858 except NameError: # If it wasn't initialized, Apply() wasn't called.
1810 pass 1859 pass
OLDNEW
« no previous file with comments | « third_party/gsutil/gslib/cloud_api_helper.py ('k') | third_party/gsutil/gslib/command_runner.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698