OLD | NEW |
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 # Copyright 2010 Google Inc. All Rights Reserved. | 2 # Copyright 2010 Google Inc. All Rights Reserved. |
3 # | 3 # |
4 # Licensed under the Apache License, Version 2.0 (the "License"); | 4 # Licensed under the Apache License, Version 2.0 (the "License"); |
5 # you may not use this file except in compliance with the License. | 5 # you may not use this file except in compliance with the License. |
6 # You may obtain a copy of the License at | 6 # You may obtain a copy of the License at |
7 # | 7 # |
8 # http://www.apache.org/licenses/LICENSE-2.0 | 8 # http://www.apache.org/licenses/LICENSE-2.0 |
9 # | 9 # |
10 # Unless required by applicable law or agreed to in writing, software | 10 # Unless required by applicable law or agreed to in writing, software |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
43 from gslib.cloud_api import AccessDeniedException | 43 from gslib.cloud_api import AccessDeniedException |
44 from gslib.cloud_api import ArgumentException | 44 from gslib.cloud_api import ArgumentException |
45 from gslib.cloud_api import ServiceException | 45 from gslib.cloud_api import ServiceException |
46 from gslib.cloud_api_delegator import CloudApiDelegator | 46 from gslib.cloud_api_delegator import CloudApiDelegator |
47 from gslib.cs_api_map import ApiSelector | 47 from gslib.cs_api_map import ApiSelector |
48 from gslib.cs_api_map import GsutilApiMapFactory | 48 from gslib.cs_api_map import GsutilApiMapFactory |
49 from gslib.exception import CommandException | 49 from gslib.exception import CommandException |
50 from gslib.help_provider import HelpProvider | 50 from gslib.help_provider import HelpProvider |
51 from gslib.name_expansion import NameExpansionIterator | 51 from gslib.name_expansion import NameExpansionIterator |
52 from gslib.name_expansion import NameExpansionResult | 52 from gslib.name_expansion import NameExpansionResult |
53 from gslib.parallelism_framework_util import AtomicIncrementDict | 53 from gslib.parallelism_framework_util import AtomicDict |
54 from gslib.parallelism_framework_util import BasicIncrementDict | |
55 from gslib.parallelism_framework_util import ThreadAndProcessSafeDict | |
56 from gslib.plurality_checkable_iterator import PluralityCheckableIterator | 54 from gslib.plurality_checkable_iterator import PluralityCheckableIterator |
57 from gslib.sig_handling import RegisterSignalHandler | 55 from gslib.sig_handling import RegisterSignalHandler |
58 from gslib.storage_url import StorageUrlFromString | 56 from gslib.storage_url import StorageUrlFromString |
59 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages | 57 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages |
60 from gslib.translation_helper import AclTranslation | 58 from gslib.translation_helper import AclTranslation |
| 59 from gslib.translation_helper import PRIVATE_DEFAULT_OBJ_ACL |
| 60 from gslib.util import CheckMultiprocessingAvailableAndInit |
61 from gslib.util import GetConfigFilePath | 61 from gslib.util import GetConfigFilePath |
62 from gslib.util import GsutilStreamHandler | 62 from gslib.util import GsutilStreamHandler |
63 from gslib.util import HaveFileUrls | 63 from gslib.util import HaveFileUrls |
64 from gslib.util import HaveProviderUrls | 64 from gslib.util import HaveProviderUrls |
65 from gslib.util import IS_WINDOWS | 65 from gslib.util import IS_WINDOWS |
66 from gslib.util import MultiprocessingIsAvailable | |
67 from gslib.util import NO_MAX | 66 from gslib.util import NO_MAX |
68 from gslib.util import UrlsAreForSingleProvider | 67 from gslib.util import UrlsAreForSingleProvider |
69 from gslib.util import UTF8 | 68 from gslib.util import UTF8 |
70 from gslib.wildcard_iterator import CreateWildcardIterator | 69 from gslib.wildcard_iterator import CreateWildcardIterator |
71 | 70 |
72 OFFER_GSUTIL_M_SUGGESTION_THRESHOLD = 5 | 71 OFFER_GSUTIL_M_SUGGESTION_THRESHOLD = 5 |
73 | 72 |
74 if IS_WINDOWS: | 73 if IS_WINDOWS: |
75 import ctypes # pylint: disable=g-import-not-at-top | 74 import ctypes # pylint: disable=g-import-not-at-top |
76 | 75 |
(...skipping 137 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
214 | 213 |
215 # List of all existing task queues - used by all pools to find the queue | 214 # List of all existing task queues - used by all pools to find the queue |
216 # that's appropriate for the given recursive_apply_level. | 215 # that's appropriate for the given recursive_apply_level. |
217 task_queues = [] | 216 task_queues = [] |
218 | 217 |
219 # Used to assign a globally unique caller ID to each Apply call. | 218 # Used to assign a globally unique caller ID to each Apply call. |
220 caller_id_lock = manager.Lock() | 219 caller_id_lock = manager.Lock() |
221 caller_id_counter = multiprocessing.Value('i', 0) | 220 caller_id_counter = multiprocessing.Value('i', 0) |
222 | 221 |
223 # Map from caller_id to total number of tasks to be completed for that ID. | 222 # Map from caller_id to total number of tasks to be completed for that ID. |
224 total_tasks = ThreadAndProcessSafeDict(manager) | 223 total_tasks = AtomicDict(manager=manager) |
225 | 224 |
226 # Map from caller_id to a boolean which is True iff all its tasks are | 225 # Map from caller_id to a boolean which is True iff all its tasks are |
227 # finished. | 226 # finished. |
228 call_completed_map = ThreadAndProcessSafeDict(manager) | 227 call_completed_map = AtomicDict(manager=manager) |
229 | 228 |
230 # Used to keep track of the set of return values for each caller ID. | 229 # Used to keep track of the set of return values for each caller ID. |
231 global_return_values_map = AtomicIncrementDict(manager) | 230 global_return_values_map = AtomicDict(manager=manager) |
232 | 231 |
233 # Condition used to notify any waiting threads that a task has finished or | 232 # Condition used to notify any waiting threads that a task has finished or |
234 # that a call to Apply needs a new set of consumer processes. | 233 # that a call to Apply needs a new set of consumer processes. |
235 need_pool_or_done_cond = manager.Condition() | 234 need_pool_or_done_cond = manager.Condition() |
236 | 235 |
237 # Lock used to prevent multiple worker processes from asking the main thread | 236 # Lock used to prevent multiple worker processes from asking the main thread |
238 # to create a new consumer pool for the same level. | 237 # to create a new consumer pool for the same level. |
239 worker_checking_level_lock = manager.Lock() | 238 worker_checking_level_lock = manager.Lock() |
240 | 239 |
241 # Map from caller_id to the current number of completed tasks for that ID. | 240 # Map from caller_id to the current number of completed tasks for that ID. |
242 caller_id_finished_count = AtomicIncrementDict(manager) | 241 caller_id_finished_count = AtomicDict(manager=manager) |
243 | 242 |
244 # Used as a way for the main thread to distinguish between being woken up | 243 # Used as a way for the main thread to distinguish between being woken up |
245 # by another call finishing and being woken up by a call that needs a new set | 244 # by another call finishing and being woken up by a call that needs a new set |
246 # of consumer processes. | 245 # of consumer processes. |
247 new_pool_needed = multiprocessing.Value('i', 0) | 246 new_pool_needed = multiprocessing.Value('i', 0) |
248 | 247 |
249 current_max_recursive_level = multiprocessing.Value('i', 0) | 248 current_max_recursive_level = multiprocessing.Value('i', 0) |
250 | 249 |
251 # Map from (caller_id, name) to the value of that shared variable. | 250 # Map from (caller_id, name) to the value of that shared variable. |
252 shared_vars_map = AtomicIncrementDict(manager) | 251 shared_vars_map = AtomicDict(manager=manager) |
253 shared_vars_list_map = ThreadAndProcessSafeDict(manager) | 252 shared_vars_list_map = AtomicDict(manager=manager) |
254 | 253 |
255 # Map from caller_id to calling class. | 254 # Map from caller_id to calling class. |
256 class_map = manager.dict() | 255 class_map = manager.dict() |
257 | 256 |
258 # Number of tasks that resulted in an exception in calls to Apply(). | 257 # Number of tasks that resulted in an exception in calls to Apply(). |
259 failure_count = multiprocessing.Value('i', 0) | 258 failure_count = multiprocessing.Value('i', 0) |
260 | 259 |
261 | 260 |
| 261 def InitializeThreadingVariables(): |
| 262 """Initializes module-level variables used when running multi-threaded. |
| 263 |
| 264 When multiprocessing is not available (or on Windows where only 1 process |
| 265 is used), thread-safe analogs to the multiprocessing global variables |
| 266 must be initialized. This function is the thread-safe analog to |
| 267 InitializeMultiprocessingVariables. |
| 268 """ |
| 269 # pylint: disable=global-variable-undefined |
| 270 global global_return_values_map, shared_vars_map, failure_count |
| 271 global caller_id_finished_count, shared_vars_list_map, total_tasks |
| 272 global need_pool_or_done_cond, call_completed_map, class_map |
| 273 global task_queues, caller_id_lock, caller_id_counter |
| 274 caller_id_counter = 0 |
| 275 caller_id_finished_count = AtomicDict() |
| 276 caller_id_lock = threading.Lock() |
| 277 call_completed_map = AtomicDict() |
| 278 class_map = AtomicDict() |
| 279 failure_count = 0 |
| 280 global_return_values_map = AtomicDict() |
| 281 need_pool_or_done_cond = threading.Condition() |
| 282 shared_vars_list_map = AtomicDict() |
| 283 shared_vars_map = AtomicDict() |
| 284 task_queues = [] |
| 285 total_tasks = AtomicDict() |
| 286 |
| 287 |
262 # Each subclass of Command must define a property named 'command_spec' that is | 288 # Each subclass of Command must define a property named 'command_spec' that is |
263 # an instance of the following class. | 289 # an instance of the following class. |
264 CommandSpec = namedtuple('CommandSpec', [ | 290 CommandSpec = namedtuple('CommandSpec', [ |
265 # Name of command. | 291 # Name of command. |
266 'command_name', | 292 'command_name', |
267 # Usage synopsis. | 293 # Usage synopsis. |
268 'usage_synopsis', | 294 'usage_synopsis', |
269 # List of command name aliases. | 295 # List of command name aliases. |
270 'command_name_aliases', | 296 'command_name_aliases', |
271 # Min number of args required by this command. | 297 # Min number of args required by this command. |
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
352 args = new_command_args[1:] + args | 378 args = new_command_args[1:] + args |
353 self.logger.warn('\n'.join(textwrap.wrap( | 379 self.logger.warn('\n'.join(textwrap.wrap( |
354 ('You are using a deprecated alias, "%(used_alias)s", for the ' | 380 ('You are using a deprecated alias, "%(used_alias)s", for the ' |
355 '"%(command_name)s" command. This will stop working on 9/9/2014. ' | 381 '"%(command_name)s" command. This will stop working on 9/9/2014. ' |
356 'Please use "%(command_name)s" with the appropriate sub-command in ' | 382 'Please use "%(command_name)s" with the appropriate sub-command in ' |
357 'the future. See "gsutil help %(command_name)s" for details.') % | 383 'the future. See "gsutil help %(command_name)s" for details.') % |
358 {'used_alias': self.command_alias_used, | 384 {'used_alias': self.command_alias_used, |
359 'command_name': self.command_name}))) | 385 'command_name': self.command_name}))) |
360 return args | 386 return args |
361 | 387 |
362 def __init__(self, command_runner, args, headers, debug, parallel_operations, | 388 def __init__(self, command_runner, args, headers, debug, trace_token, |
363 bucket_storage_uri_class, gsutil_api_class_map_factory, | 389 parallel_operations, bucket_storage_uri_class, |
364 test_method=None, logging_filters=None, | 390 gsutil_api_class_map_factory, logging_filters=None, |
365 command_alias_used=None): | 391 command_alias_used=None): |
366 """Instantiates a Command. | 392 """Instantiates a Command. |
367 | 393 |
368 Args: | 394 Args: |
369 command_runner: CommandRunner (for commands built atop other commands). | 395 command_runner: CommandRunner (for commands built atop other commands). |
370 args: Command-line args (arg0 = actual arg, not command name ala bash). | 396 args: Command-line args (arg0 = actual arg, not command name ala bash). |
371 headers: Dictionary containing optional HTTP headers to pass to boto. | 397 headers: Dictionary containing optional HTTP headers to pass to boto. |
372 debug: Debug level to pass in to boto connection (range 0..3). | 398 debug: Debug level to pass in to boto connection (range 0..3). |
| 399 trace_token: Trace token to pass to the API implementation. |
373 parallel_operations: Should command operations be executed in parallel? | 400 parallel_operations: Should command operations be executed in parallel? |
374 bucket_storage_uri_class: Class to instantiate for cloud StorageUris. | 401 bucket_storage_uri_class: Class to instantiate for cloud StorageUris. |
375 Settable for testing/mocking. | 402 Settable for testing/mocking. |
376 gsutil_api_class_map_factory: Creates map of cloud storage interfaces. | 403 gsutil_api_class_map_factory: Creates map of cloud storage interfaces. |
377 Settable for testing/mocking. | 404 Settable for testing/mocking. |
378 test_method: Optional general purpose method for testing purposes. | 405 logging_filters: Optional list of logging. Filters to apply to this |
379 Application and semantics of this method will vary by | |
380 command and test type. | |
381 logging_filters: Optional list of logging.Filters to apply to this | |
382 command's logger. | 406 command's logger. |
383 command_alias_used: The alias that was actually used when running this | 407 command_alias_used: The alias that was actually used when running this |
384 command (as opposed to the "official" command name, | 408 command (as opposed to the "official" command name, |
385 which will always correspond to the file name). | 409 which will always correspond to the file name). |
386 | 410 |
387 Implementation note: subclasses shouldn't need to define an __init__ | 411 Implementation note: subclasses shouldn't need to define an __init__ |
388 method, and instead depend on the shared initialization that happens | 412 method, and instead depend on the shared initialization that happens |
389 here. If you do define an __init__ method in a subclass you'll need to | 413 here. If you do define an __init__ method in a subclass you'll need to |
390 explicitly call super().__init__(). But you're encouraged not to do this, | 414 explicitly call super().__init__(). But you're encouraged not to do this, |
391 because it will make changing the __init__ interface more painful. | 415 because it will make changing the __init__ interface more painful. |
392 """ | 416 """ |
393 # Save class values from constructor params. | 417 # Save class values from constructor params. |
394 self.command_runner = command_runner | 418 self.command_runner = command_runner |
395 self.unparsed_args = args | 419 self.unparsed_args = args |
396 self.headers = headers | 420 self.headers = headers |
397 self.debug = debug | 421 self.debug = debug |
| 422 self.trace_token = trace_token |
398 self.parallel_operations = parallel_operations | 423 self.parallel_operations = parallel_operations |
399 self.bucket_storage_uri_class = bucket_storage_uri_class | 424 self.bucket_storage_uri_class = bucket_storage_uri_class |
400 self.gsutil_api_class_map_factory = gsutil_api_class_map_factory | 425 self.gsutil_api_class_map_factory = gsutil_api_class_map_factory |
401 self.test_method = test_method | |
402 self.exclude_symlinks = False | 426 self.exclude_symlinks = False |
403 self.recursion_requested = False | 427 self.recursion_requested = False |
404 self.all_versions = False | 428 self.all_versions = False |
405 self.command_alias_used = command_alias_used | 429 self.command_alias_used = command_alias_used |
406 | 430 |
407 # Global instance of a threaded logger object. | 431 # Global instance of a threaded logger object. |
408 self.logger = CreateGsutilLogger(self.command_name) | 432 self.logger = CreateGsutilLogger(self.command_name) |
409 if logging_filters: | 433 if logging_filters: |
410 for log_filter in logging_filters: | 434 for log_filter in logging_filters: |
411 self.logger.addFilter(log_filter) | 435 self.logger.addFilter(log_filter) |
(...skipping 26 matching lines...) Expand all Loading... |
438 default_map = { | 462 default_map = { |
439 'gs': self.command_spec.gs_default_api, | 463 'gs': self.command_spec.gs_default_api, |
440 's3': ApiSelector.XML | 464 's3': ApiSelector.XML |
441 } | 465 } |
442 self.gsutil_api_map = GsutilApiMapFactory.GetApiMap( | 466 self.gsutil_api_map = GsutilApiMapFactory.GetApiMap( |
443 self.gsutil_api_class_map_factory, support_map, default_map) | 467 self.gsutil_api_class_map_factory, support_map, default_map) |
444 | 468 |
445 self.project_id = None | 469 self.project_id = None |
446 self.gsutil_api = CloudApiDelegator( | 470 self.gsutil_api = CloudApiDelegator( |
447 bucket_storage_uri_class, self.gsutil_api_map, | 471 bucket_storage_uri_class, self.gsutil_api_map, |
448 self.logger, debug=self.debug) | 472 self.logger, debug=self.debug, trace_token=self.trace_token) |
449 | 473 |
450 # Cross-platform path to run gsutil binary. | 474 # Cross-platform path to run gsutil binary. |
451 self.gsutil_cmd = '' | 475 self.gsutil_cmd = '' |
452 # If running on Windows, invoke python interpreter explicitly. | 476 # If running on Windows, invoke python interpreter explicitly. |
453 if gslib.util.IS_WINDOWS: | 477 if gslib.util.IS_WINDOWS: |
454 self.gsutil_cmd += 'python ' | 478 self.gsutil_cmd += 'python ' |
455 # Add full path to gsutil to make sure we test the correct version. | 479 # Add full path to gsutil to make sure we test the correct version. |
456 self.gsutil_path = gslib.GSUTIL_PATH | 480 self.gsutil_path = gslib.GSUTIL_PATH |
457 self.gsutil_cmd += self.gsutil_path | 481 self.gsutil_cmd += self.gsutil_path |
458 | 482 |
459 # We're treating recursion_requested like it's used by all commands, but | 483 # We're treating recursion_requested like it's used by all commands, but |
460 # only some of the commands accept the -R option. | 484 # only some of the commands accept the -R option. |
461 if self.sub_opts: | 485 if self.sub_opts: |
462 for o, unused_a in self.sub_opts: | 486 for o, unused_a in self.sub_opts: |
463 if o == '-r' or o == '-R': | 487 if o == '-r' or o == '-R': |
464 self.recursion_requested = True | 488 self.recursion_requested = True |
465 break | 489 break |
466 | 490 |
467 self.multiprocessing_is_available = MultiprocessingIsAvailable()[0] | 491 self.multiprocessing_is_available = ( |
| 492 CheckMultiprocessingAvailableAndInit().is_available) |
468 | 493 |
469 def RaiseWrongNumberOfArgumentsException(self): | 494 def RaiseWrongNumberOfArgumentsException(self): |
470 """Raises exception for wrong number of arguments supplied to command.""" | 495 """Raises exception for wrong number of arguments supplied to command.""" |
471 if len(self.args) < self.command_spec.min_args: | 496 if len(self.args) < self.command_spec.min_args: |
472 tail_str = 's' if self.command_spec.min_args > 1 else '' | 497 tail_str = 's' if self.command_spec.min_args > 1 else '' |
473 message = ('The %s command requires at least %d argument%s.' % | 498 message = ('The %s command requires at least %d argument%s.' % |
474 (self.command_name, self.command_spec.min_args, tail_str)) | 499 (self.command_name, self.command_spec.min_args, tail_str)) |
475 else: | 500 else: |
476 message = ('The %s command accepts at most %d arguments.' % | 501 message = ('The %s command accepts at most %d arguments.' % |
477 (self.command_name, self.command_spec.max_args)) | 502 (self.command_name, self.command_spec.max_args)) |
(...skipping 204 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
682 try: | 707 try: |
683 if url.IsBucket(): | 708 if url.IsBucket(): |
684 if self.def_acl: | 709 if self.def_acl: |
685 if self.canned: | 710 if self.canned: |
686 gsutil_api.PatchBucket( | 711 gsutil_api.PatchBucket( |
687 url.bucket_name, apitools_messages.Bucket(), | 712 url.bucket_name, apitools_messages.Bucket(), |
688 canned_def_acl=self.acl_arg, provider=url.scheme, fields=['id']) | 713 canned_def_acl=self.acl_arg, provider=url.scheme, fields=['id']) |
689 else: | 714 else: |
690 def_obj_acl = AclTranslation.JsonToMessage( | 715 def_obj_acl = AclTranslation.JsonToMessage( |
691 self.acl_arg, apitools_messages.ObjectAccessControl) | 716 self.acl_arg, apitools_messages.ObjectAccessControl) |
| 717 if not def_obj_acl: |
| 718 # Use a sentinel value to indicate a private (no entries) default |
| 719 # object ACL. |
| 720 def_obj_acl.append(PRIVATE_DEFAULT_OBJ_ACL) |
692 bucket_metadata = apitools_messages.Bucket( | 721 bucket_metadata = apitools_messages.Bucket( |
693 defaultObjectAcl=def_obj_acl) | 722 defaultObjectAcl=def_obj_acl) |
694 gsutil_api.PatchBucket(url.bucket_name, bucket_metadata, | 723 gsutil_api.PatchBucket(url.bucket_name, bucket_metadata, |
695 provider=url.scheme, fields=['id']) | 724 provider=url.scheme, fields=['id']) |
696 else: | 725 else: |
697 if self.canned: | 726 if self.canned: |
698 gsutil_api.PatchBucket( | 727 gsutil_api.PatchBucket( |
699 url.bucket_name, apitools_messages.Bucket(), | 728 url.bucket_name, apitools_messages.Bucket(), |
700 canned_acl=self.acl_arg, provider=url.scheme, fields=['id']) | 729 canned_acl=self.acl_arg, provider=url.scheme, fields=['id']) |
701 else: | 730 else: |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
742 self.command_name) | 771 self.command_name) |
743 | 772 |
744 # Determine whether acl_arg names a file containing XML ACL text vs. the | 773 # Determine whether acl_arg names a file containing XML ACL text vs. the |
745 # string name of a canned ACL. | 774 # string name of a canned ACL. |
746 if os.path.isfile(acl_arg): | 775 if os.path.isfile(acl_arg): |
747 with codecs.open(acl_arg, 'r', UTF8) as f: | 776 with codecs.open(acl_arg, 'r', UTF8) as f: |
748 acl_arg = f.read() | 777 acl_arg = f.read() |
749 self.canned = False | 778 self.canned = False |
750 else: | 779 else: |
751 # No file exists, so expect a canned ACL string. | 780 # No file exists, so expect a canned ACL string. |
752 # Canned ACLs are not supported in JSON and we need to use the XML API | |
753 # to set them. | |
754 # validate=False because we allow wildcard urls. | 781 # validate=False because we allow wildcard urls. |
755 storage_uri = boto.storage_uri( | 782 storage_uri = boto.storage_uri( |
756 url_args[0], debug=self.debug, validate=False, | 783 url_args[0], debug=self.debug, validate=False, |
757 bucket_storage_uri_class=self.bucket_storage_uri_class) | 784 bucket_storage_uri_class=self.bucket_storage_uri_class) |
758 | 785 |
759 canned_acls = storage_uri.canned_acls() | 786 canned_acls = storage_uri.canned_acls() |
760 if acl_arg not in canned_acls: | 787 if acl_arg not in canned_acls: |
761 raise CommandException('Invalid canned ACL "%s".' % acl_arg) | 788 raise CommandException('Invalid canned ACL "%s".' % acl_arg) |
762 self.canned = True | 789 self.canned = True |
763 | 790 |
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
859 self.WildcardIterator(url_str).IterBuckets( | 886 self.WildcardIterator(url_str).IterBuckets( |
860 bucket_fields=bucket_fields)) | 887 bucket_fields=bucket_fields)) |
861 if plurality_iter.IsEmpty(): | 888 if plurality_iter.IsEmpty(): |
862 raise CommandException('No URLs matched') | 889 raise CommandException('No URLs matched') |
863 if plurality_iter.HasPlurality(): | 890 if plurality_iter.HasPlurality(): |
864 raise CommandException( | 891 raise CommandException( |
865 '%s matched more than one URL, which is not allowed by the %s ' | 892 '%s matched more than one URL, which is not allowed by the %s ' |
866 'command' % (url_str, self.command_name)) | 893 'command' % (url_str, self.command_name)) |
867 return list(plurality_iter)[0] | 894 return list(plurality_iter)[0] |
868 | 895 |
869 def _HandleMultiProcessingSigs(self, unused_signal_num, | 896 def _HandleMultiProcessingSigs(self, signal_num, unused_cur_stack_frame): |
870 unused_cur_stack_frame): | |
871 """Handles signals INT AND TERM during a multi-process/multi-thread request. | 897 """Handles signals INT AND TERM during a multi-process/multi-thread request. |
872 | 898 |
873 Kills subprocesses. | 899 Kills subprocesses. |
874 | 900 |
875 Args: | 901 Args: |
876 unused_signal_num: signal generated by ^C. | 902 unused_signal_num: signal generated by ^C. |
877 unused_cur_stack_frame: Current stack frame. | 903 unused_cur_stack_frame: Current stack frame. |
878 """ | 904 """ |
879 # Note: This only works under Linux/MacOS. See | 905 # Note: This only works under Linux/MacOS. See |
880 # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details | 906 # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details |
881 # about why making it work correctly across OS's is harder and still open. | 907 # about why making it work correctly across OS's is harder and still open. |
882 ShutDownGsutil() | 908 ShutDownGsutil() |
883 sys.stderr.write('Caught ^C - exiting\n') | 909 if signal_num == signal.SIGINT: |
| 910 sys.stderr.write('Caught ^C - exiting\n') |
884 # Simply calling sys.exit(1) doesn't work - see above bug for details. | 911 # Simply calling sys.exit(1) doesn't work - see above bug for details. |
885 KillProcess(os.getpid()) | 912 KillProcess(os.getpid()) |
886 | 913 |
887 def GetSingleBucketUrlFromArg(self, arg, bucket_fields=None): | 914 def GetSingleBucketUrlFromArg(self, arg, bucket_fields=None): |
888 """Gets a single bucket URL based on the command arguments. | 915 """Gets a single bucket URL based on the command arguments. |
889 | 916 |
890 Args: | 917 Args: |
891 arg: String argument to get bucket URL for. | 918 arg: String argument to get bucket URL for. |
892 bucket_fields: Fields to populate for the bucket. | 919 bucket_fields: Fields to populate for the bucket. |
893 | 920 |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
995 'update your config file (located at %s) and set ' | 1022 'update your config file (located at %s) and set ' |
996 '"parallel_process_count = 1".') % | 1023 '"parallel_process_count = 1".') % |
997 GetConfigFilePath()))) | 1024 GetConfigFilePath()))) |
998 self.logger.debug('process count: %d', process_count) | 1025 self.logger.debug('process count: %d', process_count) |
999 self.logger.debug('thread count: %d', thread_count) | 1026 self.logger.debug('thread count: %d', thread_count) |
1000 | 1027 |
1001 return (process_count, thread_count) | 1028 return (process_count, thread_count) |
1002 | 1029 |
1003 def _SetUpPerCallerState(self): | 1030 def _SetUpPerCallerState(self): |
1004 """Set up the state for a caller id, corresponding to one Apply call.""" | 1031 """Set up the state for a caller id, corresponding to one Apply call.""" |
| 1032 # pylint: disable=global-variable-undefined,global-variable-not-assigned |
| 1033 # These variables are initialized in InitializeMultiprocessingVariables or |
| 1034 # InitializeThreadingVariables |
| 1035 global global_return_values_map, shared_vars_map, failure_count |
| 1036 global caller_id_finished_count, shared_vars_list_map, total_tasks |
| 1037 global need_pool_or_done_cond, call_completed_map, class_map |
| 1038 global task_queues, caller_id_lock, caller_id_counter |
1005 # Get a new caller ID. | 1039 # Get a new caller ID. |
1006 with caller_id_lock: | 1040 with caller_id_lock: |
1007 caller_id_counter.value += 1 | 1041 if isinstance(caller_id_counter, int): |
1008 caller_id = caller_id_counter.value | 1042 caller_id_counter += 1 |
| 1043 caller_id = caller_id_counter |
| 1044 else: |
| 1045 caller_id_counter.value += 1 |
| 1046 caller_id = caller_id_counter.value |
1009 | 1047 |
1010 # Create a copy of self with an incremented recursive level. This allows | 1048 # Create a copy of self with an incremented recursive level. This allows |
1011 # the class to report its level correctly if the function called from it | 1049 # the class to report its level correctly if the function called from it |
1012 # also needs to call Apply. | 1050 # also needs to call Apply. |
1013 cls = copy.copy(self) | 1051 cls = copy.copy(self) |
1014 cls.recursive_apply_level += 1 | 1052 cls.recursive_apply_level += 1 |
1015 | 1053 |
1016 # Thread-safe loggers can't be pickled, so we will remove it here and | 1054 # Thread-safe loggers can't be pickled, so we will remove it here and |
1017 # recreate it later in the WorkerThread. This is not a problem since any | 1055 # recreate it later in the WorkerThread. This is not a problem since any |
1018 # logger with the same name will be treated as a singleton. | 1056 # logger with the same name will be treated as a singleton. |
1019 cls.logger = None | 1057 cls.logger = None |
1020 | 1058 |
1021 # Likewise, the default API connection can't be pickled, but it is unused | 1059 # Likewise, the default API connection can't be pickled, but it is unused |
1022 # anyway as each thread gets its own API delegator. | 1060 # anyway as each thread gets its own API delegator. |
1023 cls.gsutil_api = None | 1061 cls.gsutil_api = None |
1024 | 1062 |
1025 class_map[caller_id] = cls | 1063 class_map[caller_id] = cls |
1026 total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet. | 1064 total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet. |
1027 call_completed_map[caller_id] = False | 1065 call_completed_map[caller_id] = False |
1028 caller_id_finished_count.Put(caller_id, 0) | 1066 caller_id_finished_count[caller_id] = 0 |
1029 global_return_values_map.Put(caller_id, []) | 1067 global_return_values_map[caller_id] = [] |
1030 return caller_id | 1068 return caller_id |
1031 | 1069 |
1032 def _CreateNewConsumerPool(self, num_processes, num_threads): | 1070 def _CreateNewConsumerPool(self, num_processes, num_threads): |
1033 """Create a new pool of processes that call _ApplyThreads.""" | 1071 """Create a new pool of processes that call _ApplyThreads.""" |
1034 processes = [] | 1072 processes = [] |
1035 task_queue = _NewMultiprocessingQueue() | 1073 task_queue = _NewMultiprocessingQueue() |
1036 task_queues.append(task_queue) | 1074 task_queues.append(task_queue) |
1037 | 1075 |
1038 current_max_recursive_level.value += 1 | 1076 current_max_recursive_level.value += 1 |
1039 if current_max_recursive_level.value > MAX_RECURSIVE_DEPTH: | 1077 if current_max_recursive_level.value > MAX_RECURSIVE_DEPTH: |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1096 and self.sequential_caller_id == -1) | 1134 and self.sequential_caller_id == -1) |
1097 | 1135 |
1098 # We don't honor the fail_on_error flag in the case of multiple threads | 1136 # We don't honor the fail_on_error flag in the case of multiple threads |
1099 # or processes. | 1137 # or processes. |
1100 fail_on_error = fail_on_error and (process_count * thread_count == 1) | 1138 fail_on_error = fail_on_error and (process_count * thread_count == 1) |
1101 | 1139 |
1102 # Only check this from the first call in the main thread. Apart from the | 1140 # Only check this from the first call in the main thread. Apart from the |
1103 # fact that it's wasteful to try this multiple times in general, it also | 1141 # fact that it's wasteful to try this multiple times in general, it also |
1104 # will never work when called from a subprocess since we use daemon | 1142 # will never work when called from a subprocess since we use daemon |
1105 # processes, and daemons can't create other processes. | 1143 # processes, and daemons can't create other processes. |
1106 if is_main_thread: | 1144 if (is_main_thread and not self.multiprocessing_is_available and |
1107 if ((not self.multiprocessing_is_available) | 1145 process_count > 1): |
1108 and thread_count * process_count > 1): | 1146 # Run the check again and log the appropriate warnings. This was run |
1109 # Run the check again and log the appropriate warnings. This was run | 1147 # before, when the Command object was created, in order to calculate |
1110 # before, when the Command object was created, in order to calculate | 1148 # self.multiprocessing_is_available, but we don't want to print the |
1111 # self.multiprocessing_is_available, but we don't want to print the | 1149 # warning until we're sure the user actually tried to use multiple |
1112 # warning until we're sure the user actually tried to use multiple | 1150 # threads or processes. |
1113 # threads or processes. | 1151 CheckMultiprocessingAvailableAndInit(logger=self.logger) |
1114 MultiprocessingIsAvailable(logger=self.logger) | |
1115 | 1152 |
1116 if self.multiprocessing_is_available: | 1153 caller_id = self._SetUpPerCallerState() |
1117 caller_id = self._SetUpPerCallerState() | |
1118 else: | |
1119 self.sequential_caller_id += 1 | |
1120 caller_id = self.sequential_caller_id | |
1121 | |
1122 if is_main_thread: | |
1123 # pylint: disable=global-variable-undefined | |
1124 global global_return_values_map, shared_vars_map, failure_count | |
1125 global caller_id_finished_count, shared_vars_list_map | |
1126 global_return_values_map = BasicIncrementDict() | |
1127 global_return_values_map.Put(caller_id, []) | |
1128 shared_vars_map = BasicIncrementDict() | |
1129 caller_id_finished_count = BasicIncrementDict() | |
1130 shared_vars_list_map = {} | |
1131 failure_count = 0 | |
1132 | 1154 |
1133 # If any shared attributes passed by caller, create a dictionary of | 1155 # If any shared attributes passed by caller, create a dictionary of |
1134 # shared memory variables for every element in the list of shared | 1156 # shared memory variables for every element in the list of shared |
1135 # attributes. | 1157 # attributes. |
1136 if shared_attrs: | 1158 if shared_attrs: |
1137 shared_vars_list_map[caller_id] = shared_attrs | 1159 shared_vars_list_map[caller_id] = shared_attrs |
1138 for name in shared_attrs: | 1160 for name in shared_attrs: |
1139 shared_vars_map.Put((caller_id, name), 0) | 1161 shared_vars_map[(caller_id, name)] = 0 |
1140 | 1162 |
1141 # Make all of the requested function calls. | 1163 # Make all of the requested function calls. |
1142 if self.multiprocessing_is_available and thread_count * process_count > 1: | 1164 usable_processes_count = (process_count if self.multiprocessing_is_available |
| 1165 else 1) |
| 1166 if thread_count * usable_processes_count > 1: |
1143 self._ParallelApply(func, args_iterator, exception_handler, caller_id, | 1167 self._ParallelApply(func, args_iterator, exception_handler, caller_id, |
1144 arg_checker, process_count, thread_count, | 1168 arg_checker, usable_processes_count, thread_count, |
1145 should_return_results, fail_on_error) | 1169 should_return_results, fail_on_error) |
1146 else: | 1170 else: |
1147 self._SequentialApply(func, args_iterator, exception_handler, caller_id, | 1171 self._SequentialApply(func, args_iterator, exception_handler, caller_id, |
1148 arg_checker, should_return_results, fail_on_error) | 1172 arg_checker, should_return_results, fail_on_error) |
1149 | 1173 |
1150 if shared_attrs: | 1174 if shared_attrs: |
1151 for name in shared_attrs: | 1175 for name in shared_attrs: |
1152 # This allows us to retain the original value of the shared variable, | 1176 # This allows us to retain the original value of the shared variable, |
1153 # and simply apply the delta after what was done during the call to | 1177 # and simply apply the delta after what was done during the call to |
1154 # apply. | 1178 # apply. |
1155 final_value = (original_shared_vars_values[name] + | 1179 final_value = (original_shared_vars_values[name] + |
1156 shared_vars_map.Get((caller_id, name))) | 1180 shared_vars_map.get((caller_id, name))) |
1157 setattr(self, name, final_value) | 1181 setattr(self, name, final_value) |
1158 | 1182 |
1159 if should_return_results: | 1183 if should_return_results: |
1160 return global_return_values_map.Get(caller_id) | 1184 return global_return_values_map.get(caller_id) |
1161 | 1185 |
1162 def _MaybeSuggestGsutilDashM(self): | 1186 def _MaybeSuggestGsutilDashM(self): |
1163 """Outputs a sugestion to the user to use gsutil -m.""" | 1187 """Outputs a sugestion to the user to use gsutil -m.""" |
1164 if not (boto.config.getint('GSUtil', 'parallel_process_count', 0) == 1 and | 1188 if not (boto.config.getint('GSUtil', 'parallel_process_count', 0) == 1 and |
1165 boto.config.getint('GSUtil', 'parallel_thread_count', 0) == 1): | 1189 boto.config.getint('GSUtil', 'parallel_thread_count', 0) == 1): |
1166 self.logger.info('\n' + textwrap.fill( | 1190 self.logger.info('\n' + textwrap.fill( |
1167 '==> NOTE: You are performing a sequence of gsutil operations that ' | 1191 '==> NOTE: You are performing a sequence of gsutil operations that ' |
1168 'may run significantly faster if you instead use gsutil -m %s ...\n' | 1192 'may run significantly faster if you instead use gsutil -m %s ...\n' |
1169 'Please see the -m section under "gsutil help options" for further ' | 1193 'Please see the -m section under "gsutil help options" for further ' |
1170 'information about when gsutil -m can be advantageous.' | 1194 'information about when gsutil -m can be advantageous.' |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1215 if arg_checker(self, args): | 1239 if arg_checker(self, args): |
1216 # Now that we actually have the next argument, perform the task. | 1240 # Now that we actually have the next argument, perform the task. |
1217 task = Task(func, args, caller_id, exception_handler, | 1241 task = Task(func, args, caller_id, exception_handler, |
1218 should_return_results, arg_checker, fail_on_error) | 1242 should_return_results, arg_checker, fail_on_error) |
1219 worker_thread.PerformTask(task, self) | 1243 worker_thread.PerformTask(task, self) |
1220 if sequential_call_count >= gslib.util.GetTermLines(): | 1244 if sequential_call_count >= gslib.util.GetTermLines(): |
1221 # Output suggestion at end of long run, in case user missed it at the | 1245 # Output suggestion at end of long run, in case user missed it at the |
1222 # start and it scrolled off-screen. | 1246 # start and it scrolled off-screen. |
1223 self._MaybeSuggestGsutilDashM() | 1247 self._MaybeSuggestGsutilDashM() |
1224 | 1248 |
| 1249 # If the final iterated argument results in an exception, and that |
| 1250 # exception modifies shared_attrs, we need to publish the results. |
| 1251 worker_thread.shared_vars_updater.Update(caller_id, self) |
| 1252 |
1225 # pylint: disable=g-doc-args | 1253 # pylint: disable=g-doc-args |
1226 def _ParallelApply(self, func, args_iterator, exception_handler, caller_id, | 1254 def _ParallelApply(self, func, args_iterator, exception_handler, caller_id, |
1227 arg_checker, process_count, thread_count, | 1255 arg_checker, process_count, thread_count, |
1228 should_return_results, fail_on_error): | 1256 should_return_results, fail_on_error): |
1229 """Dispatches input arguments across a thread/process pool. | 1257 """Dispatches input arguments across a thread/process pool. |
1230 | 1258 |
1231 Pools are composed of parallel OS processes and/or Python threads, | 1259 Pools are composed of parallel OS processes and/or Python threads, |
1232 based on options (-m or not) and settings in the user's config file. | 1260 based on options (-m or not) and settings in the user's config file. |
1233 | 1261 |
1234 If only one OS process is requested/available, dispatch requests across | 1262 If only one OS process is requested/available, dispatch requests across |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1276 RegisterSignalHandler(signal.SIGINT, self._HandleMultiProcessingSigs, | 1304 RegisterSignalHandler(signal.SIGINT, self._HandleMultiProcessingSigs, |
1277 is_final_handler=True) | 1305 is_final_handler=True) |
1278 RegisterSignalHandler(signal.SIGTERM, self._HandleMultiProcessingSigs, | 1306 RegisterSignalHandler(signal.SIGTERM, self._HandleMultiProcessingSigs, |
1279 is_final_handler=True) | 1307 is_final_handler=True) |
1280 | 1308 |
1281 if not task_queues: | 1309 if not task_queues: |
1282 # The process we create will need to access the next recursive level | 1310 # The process we create will need to access the next recursive level |
1283 # of task queues if it makes a call to Apply, so we always keep around | 1311 # of task queues if it makes a call to Apply, so we always keep around |
1284 # one more queue than we know we need. OTOH, if we don't create a new | 1312 # one more queue than we know we need. OTOH, if we don't create a new |
1285 # process, the existing process still needs a task queue to use. | 1313 # process, the existing process still needs a task queue to use. |
1286 task_queues.append(_NewMultiprocessingQueue()) | 1314 if process_count > 1: |
| 1315 task_queues.append(_NewMultiprocessingQueue()) |
| 1316 else: |
| 1317 task_queues.append(_NewThreadsafeQueue()) |
1287 | 1318 |
1288 if process_count > 1: # Handle process pool creation. | 1319 if process_count > 1: # Handle process pool creation. |
1289 # Check whether this call will need a new set of workers. | 1320 # Check whether this call will need a new set of workers. |
1290 | 1321 |
1291 # Each worker must acquire a shared lock before notifying the main thread | 1322 # Each worker must acquire a shared lock before notifying the main thread |
1292 # that it needs a new worker pool, so that at most one worker asks for | 1323 # that it needs a new worker pool, so that at most one worker asks for |
1293 # a new worker pool at once. | 1324 # a new worker pool at once. |
1294 try: | 1325 try: |
1295 if not is_main_thread: | 1326 if not is_main_thread: |
1296 worker_checking_level_lock.acquire() | 1327 worker_checking_level_lock.acquire() |
(...skipping 11 matching lines...) Expand all Loading... |
1308 need_pool_or_done_cond.wait() | 1339 need_pool_or_done_cond.wait() |
1309 finally: | 1340 finally: |
1310 if not is_main_thread: | 1341 if not is_main_thread: |
1311 worker_checking_level_lock.release() | 1342 worker_checking_level_lock.release() |
1312 | 1343 |
1313 # If we're running in this process, create a separate task queue. Otherwise, | 1344 # If we're running in this process, create a separate task queue. Otherwise, |
1314 # if Apply has already been called with process_count > 1, then there will | 1345 # if Apply has already been called with process_count > 1, then there will |
1315 # be consumer pools trying to use our processes. | 1346 # be consumer pools trying to use our processes. |
1316 if process_count > 1: | 1347 if process_count > 1: |
1317 task_queue = task_queues[self.recursive_apply_level] | 1348 task_queue = task_queues[self.recursive_apply_level] |
| 1349 elif self.multiprocessing_is_available: |
| 1350 task_queue = _NewMultiprocessingQueue() |
1318 else: | 1351 else: |
1319 task_queue = _NewMultiprocessingQueue() | 1352 task_queue = _NewThreadsafeQueue() |
1320 | 1353 |
1321 # Kick off a producer thread to throw tasks in the global task queue. We | 1354 # Kick off a producer thread to throw tasks in the global task queue. We |
1322 # do this asynchronously so that the main thread can be free to create new | 1355 # do this asynchronously so that the main thread can be free to create new |
1323 # consumer pools when needed (otherwise, any thread with a task that needs | 1356 # consumer pools when needed (otherwise, any thread with a task that needs |
1324 # a new consumer pool must block until we're completely done producing; in | 1357 # a new consumer pool must block until we're completely done producing; in |
1325 # the worst case, every worker blocks on such a call and the producer fills | 1358 # the worst case, every worker blocks on such a call and the producer fills |
1326 # up the task queue before it finishes, so we block forever). | 1359 # up the task queue before it finishes, so we block forever). |
1327 producer_thread = ProducerThread(copy.copy(self), args_iterator, caller_id, | 1360 producer_thread = ProducerThread(copy.copy(self), args_iterator, caller_id, |
1328 func, task_queue, should_return_results, | 1361 func, task_queue, should_return_results, |
1329 exception_handler, arg_checker, | 1362 exception_handler, arg_checker, |
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1387 of this thread. | 1420 of this thread. |
1388 is_blocking_call: True iff the call to Apply is blocked on this call | 1421 is_blocking_call: True iff the call to Apply is blocked on this call |
1389 (which is true iff process_count == 1), implying that | 1422 (which is true iff process_count == 1), implying that |
1390 _ApplyThreads must behave as a blocking call. | 1423 _ApplyThreads must behave as a blocking call. |
1391 """ | 1424 """ |
1392 self._ResetConnectionPool() | 1425 self._ResetConnectionPool() |
1393 self.recursive_apply_level = recursive_apply_level | 1426 self.recursive_apply_level = recursive_apply_level |
1394 | 1427 |
1395 task_queue = task_queue or task_queues[recursive_apply_level] | 1428 task_queue = task_queue or task_queues[recursive_apply_level] |
1396 | 1429 |
| 1430 # Ensure fairness across processes by filling our WorkerPool only with |
| 1431 # as many tasks as it has WorkerThreads. This semaphore is acquired each |
| 1432 # time that a task is retrieved from the queue and released each time |
| 1433 # a task is completed by a WorkerThread. |
| 1434 worker_semaphore = threading.BoundedSemaphore(thread_count) |
| 1435 |
1397 assert thread_count * process_count > 1, ( | 1436 assert thread_count * process_count > 1, ( |
1398 'Invalid state, calling command._ApplyThreads with only one thread ' | 1437 'Invalid state, calling command._ApplyThreads with only one thread ' |
1399 'and process.') | 1438 'and process.') |
| 1439 # TODO: Presently, this pool gets recreated with each call to Apply. We |
| 1440 # should be able to do it just once, at process creation time. |
1400 worker_pool = WorkerPool( | 1441 worker_pool = WorkerPool( |
1401 thread_count, self.logger, | 1442 thread_count, self.logger, worker_semaphore, |
1402 bucket_storage_uri_class=self.bucket_storage_uri_class, | 1443 bucket_storage_uri_class=self.bucket_storage_uri_class, |
1403 gsutil_api_map=self.gsutil_api_map, debug=self.debug) | 1444 gsutil_api_map=self.gsutil_api_map, debug=self.debug) |
1404 | 1445 |
1405 num_enqueued = 0 | 1446 num_enqueued = 0 |
1406 while True: | 1447 while True: |
| 1448 worker_semaphore.acquire() |
1407 task = task_queue.get() | 1449 task = task_queue.get() |
1408 if task.args != ZERO_TASKS_TO_DO_ARGUMENT: | 1450 if task.args != ZERO_TASKS_TO_DO_ARGUMENT: |
1409 # If we have no tasks to do and we're performing a blocking call, we | 1451 # If we have no tasks to do and we're performing a blocking call, we |
1410 # need a special signal to tell us to stop - otherwise, we block on | 1452 # need a special signal to tell us to stop - otherwise, we block on |
1411 # the call to task_queue.get() forever. | 1453 # the call to task_queue.get() forever. |
1412 worker_pool.AddTask(task) | 1454 worker_pool.AddTask(task) |
1413 num_enqueued += 1 | 1455 num_enqueued += 1 |
| 1456 else: |
| 1457 # No tasks remain; don't block the semaphore on WorkerThread completion. |
| 1458 worker_semaphore.release() |
1414 | 1459 |
1415 if is_blocking_call: | 1460 if is_blocking_call: |
1416 num_to_do = total_tasks[task.caller_id] | 1461 num_to_do = total_tasks[task.caller_id] |
1417 # The producer thread won't enqueue the last task until after it has | 1462 # The producer thread won't enqueue the last task until after it has |
1418 # updated total_tasks[caller_id], so we know that num_to_do < 0 implies | 1463 # updated total_tasks[caller_id], so we know that num_to_do < 0 implies |
1419 # we will do this check again. | 1464 # we will do this check again. |
1420 if num_to_do >= 0 and num_enqueued == num_to_do: | 1465 if num_to_do >= 0 and num_enqueued == num_to_do: |
1421 if thread_count == 1: | 1466 if thread_count == 1: |
1422 return | 1467 return |
1423 else: | 1468 else: |
(...skipping 157 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1581 total_tasks[self.caller_id] = num_tasks | 1626 total_tasks[self.caller_id] = num_tasks |
1582 if not cur_task: | 1627 if not cur_task: |
1583 # This happens if there were zero arguments to be put in the queue. | 1628 # This happens if there were zero arguments to be put in the queue. |
1584 cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id, | 1629 cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id, |
1585 None, None, None, None) | 1630 None, None, None, None) |
1586 self.task_queue.put(cur_task) | 1631 self.task_queue.put(cur_task) |
1587 | 1632 |
1588 # It's possible that the workers finished before we updated total_tasks, | 1633 # It's possible that the workers finished before we updated total_tasks, |
1589 # so we need to check here as well. | 1634 # so we need to check here as well. |
1590 _NotifyIfDone(self.caller_id, | 1635 _NotifyIfDone(self.caller_id, |
1591 caller_id_finished_count.Get(self.caller_id)) | 1636 caller_id_finished_count.get(self.caller_id)) |
1592 | 1637 |
1593 | 1638 |
1594 class WorkerPool(object): | 1639 class WorkerPool(object): |
1595 """Pool of worker threads to which tasks can be added.""" | 1640 """Pool of worker threads to which tasks can be added.""" |
1596 | 1641 |
1597 def __init__(self, thread_count, logger, bucket_storage_uri_class=None, | 1642 def __init__(self, thread_count, logger, worker_semaphore, |
1598 gsutil_api_map=None, debug=0): | 1643 bucket_storage_uri_class=None, gsutil_api_map=None, debug=0): |
1599 self.task_queue = _NewThreadsafeQueue() | 1644 self.task_queue = _NewThreadsafeQueue() |
1600 self.threads = [] | 1645 self.threads = [] |
1601 for _ in range(thread_count): | 1646 for _ in range(thread_count): |
1602 worker_thread = WorkerThread( | 1647 worker_thread = WorkerThread( |
1603 self.task_queue, logger, | 1648 self.task_queue, logger, worker_semaphore=worker_semaphore, |
1604 bucket_storage_uri_class=bucket_storage_uri_class, | 1649 bucket_storage_uri_class=bucket_storage_uri_class, |
1605 gsutil_api_map=gsutil_api_map, debug=debug) | 1650 gsutil_api_map=gsutil_api_map, debug=debug) |
1606 self.threads.append(worker_thread) | 1651 self.threads.append(worker_thread) |
1607 worker_thread.start() | 1652 worker_thread.start() |
1608 | 1653 |
1609 def AddTask(self, task): | 1654 def AddTask(self, task): |
1610 self.task_queue.put(task) | 1655 self.task_queue.put(task) |
1611 | 1656 |
1612 | 1657 |
1613 class WorkerThread(threading.Thread): | 1658 class WorkerThread(threading.Thread): |
1614 """Thread where all the work will be performed. | 1659 """Thread where all the work will be performed. |
1615 | 1660 |
1616 This makes the function calls for Apply and takes care of all error handling, | 1661 This makes the function calls for Apply and takes care of all error handling, |
1617 return value propagation, and shared_vars. | 1662 return value propagation, and shared_vars. |
1618 | 1663 |
1619 Note that this thread is NOT started upon instantiation because the function- | 1664 Note that this thread is NOT started upon instantiation because the function- |
1620 calling logic is also used in the single-threaded case. | 1665 calling logic is also used in the single-threaded case. |
1621 """ | 1666 """ |
1622 | 1667 |
1623 def __init__(self, task_queue, logger, bucket_storage_uri_class=None, | 1668 def __init__(self, task_queue, logger, worker_semaphore=None, |
1624 gsutil_api_map=None, debug=0): | 1669 bucket_storage_uri_class=None, gsutil_api_map=None, debug=0): |
1625 """Initializes the worker thread. | 1670 """Initializes the worker thread. |
1626 | 1671 |
1627 Args: | 1672 Args: |
1628 task_queue: The thread-safe queue from which this thread should obtain | 1673 task_queue: The thread-safe queue from which this thread should obtain |
1629 its work. | 1674 its work. |
1630 logger: Logger to use for this thread. | 1675 logger: Logger to use for this thread. |
| 1676 worker_semaphore: threading.BoundedSemaphore to be released each time a |
| 1677 task is completed, or None for single-threaded execution. |
1631 bucket_storage_uri_class: Class to instantiate for cloud StorageUris. | 1678 bucket_storage_uri_class: Class to instantiate for cloud StorageUris. |
1632 Settable for testing/mocking. | 1679 Settable for testing/mocking. |
1633 gsutil_api_map: Map of providers and API selector tuples to api classes | 1680 gsutil_api_map: Map of providers and API selector tuples to api classes |
1634 which can be used to communicate with those providers. | 1681 which can be used to communicate with those providers. |
1635 Used for the instantiating CloudApiDelegator class. | 1682 Used for the instantiating CloudApiDelegator class. |
1636 debug: debug level for the CloudApiDelegator class. | 1683 debug: debug level for the CloudApiDelegator class. |
1637 """ | 1684 """ |
1638 super(WorkerThread, self).__init__() | 1685 super(WorkerThread, self).__init__() |
1639 self.task_queue = task_queue | 1686 self.task_queue = task_queue |
| 1687 self.worker_semaphore = worker_semaphore |
1640 self.daemon = True | 1688 self.daemon = True |
1641 self.cached_classes = {} | 1689 self.cached_classes = {} |
1642 self.shared_vars_updater = _SharedVariablesUpdater() | 1690 self.shared_vars_updater = _SharedVariablesUpdater() |
1643 | 1691 |
1644 self.thread_gsutil_api = None | 1692 self.thread_gsutil_api = None |
1645 if bucket_storage_uri_class and gsutil_api_map: | 1693 if bucket_storage_uri_class and gsutil_api_map: |
1646 self.thread_gsutil_api = CloudApiDelegator( | 1694 self.thread_gsutil_api = CloudApiDelegator( |
1647 bucket_storage_uri_class, gsutil_api_map, logger, debug=debug) | 1695 bucket_storage_uri_class, gsutil_api_map, logger, debug=debug) |
1648 | 1696 |
1649 def PerformTask(self, task, cls): | 1697 def PerformTask(self, task, cls): |
1650 """Makes the function call for a task. | 1698 """Makes the function call for a task. |
1651 | 1699 |
1652 Args: | 1700 Args: |
1653 task: The Task to perform. | 1701 task: The Task to perform. |
1654 cls: The instance of a class which gives context to the functions called | 1702 cls: The instance of a class which gives context to the functions called |
1655 by the Task's function. E.g., see SetAclFuncWrapper. | 1703 by the Task's function. E.g., see SetAclFuncWrapper. |
1656 """ | 1704 """ |
1657 caller_id = task.caller_id | 1705 caller_id = task.caller_id |
1658 try: | 1706 try: |
1659 results = task.func(cls, task.args, thread_state=self.thread_gsutil_api) | 1707 results = task.func(cls, task.args, thread_state=self.thread_gsutil_api) |
1660 if task.should_return_results: | 1708 if task.should_return_results: |
1661 global_return_values_map.Update(caller_id, [results], default_value=[]) | 1709 global_return_values_map.Increment(caller_id, [results], |
| 1710 default_value=[]) |
1662 except Exception, e: # pylint: disable=broad-except | 1711 except Exception, e: # pylint: disable=broad-except |
1663 _IncrementFailureCount() | 1712 _IncrementFailureCount() |
1664 if task.fail_on_error: | 1713 if task.fail_on_error: |
1665 raise # Only happens for single thread and process case. | 1714 raise # Only happens for single thread and process case. |
1666 else: | 1715 else: |
1667 try: | 1716 try: |
1668 task.exception_handler(cls, e) | 1717 task.exception_handler(cls, e) |
1669 except Exception, _: # pylint: disable=broad-except | 1718 except Exception, _: # pylint: disable=broad-except |
1670 # Don't allow callers to raise exceptions here and kill the worker | 1719 # Don't allow callers to raise exceptions here and kill the worker |
1671 # threads. | 1720 # threads. |
1672 cls.logger.debug( | 1721 cls.logger.debug( |
1673 'Caught exception while handling exception for %s:\n%s', | 1722 'Caught exception while handling exception for %s:\n%s', |
1674 task, traceback.format_exc()) | 1723 task, traceback.format_exc()) |
1675 finally: | 1724 finally: |
| 1725 if self.worker_semaphore: |
| 1726 self.worker_semaphore.release() |
1676 self.shared_vars_updater.Update(caller_id, cls) | 1727 self.shared_vars_updater.Update(caller_id, cls) |
1677 | 1728 |
1678 # Even if we encounter an exception, we still need to claim that that | 1729 # Even if we encounter an exception, we still need to claim that that |
1679 # the function finished executing. Otherwise, we won't know when to | 1730 # the function finished executing. Otherwise, we won't know when to |
1680 # stop waiting and return results. | 1731 # stop waiting and return results. |
1681 num_done = caller_id_finished_count.Update(caller_id, 1) | 1732 num_done = caller_id_finished_count.Increment(caller_id, 1) |
1682 | 1733 _NotifyIfDone(caller_id, num_done) |
1683 if cls.multiprocessing_is_available: | |
1684 _NotifyIfDone(caller_id, num_done) | |
1685 | 1734 |
1686 def run(self): | 1735 def run(self): |
1687 while True: | 1736 while True: |
1688 task = self.task_queue.get() | 1737 task = self.task_queue.get() |
1689 caller_id = task.caller_id | 1738 caller_id = task.caller_id |
1690 | 1739 |
1691 # Get the instance of the command with the appropriate context. | 1740 # Get the instance of the command with the appropriate context. |
1692 cls = self.cached_classes.get(caller_id, None) | 1741 cls = self.cached_classes.get(caller_id, None) |
1693 if not cls: | 1742 if not cls: |
1694 cls = copy.copy(class_map[caller_id]) | 1743 cls = copy.copy(class_map[caller_id]) |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1734 key = (caller_id, name) | 1783 key = (caller_id, name) |
1735 last_value = self.last_shared_var_values.get(key, 0) | 1784 last_value = self.last_shared_var_values.get(key, 0) |
1736 # Compute the change made since the last time we updated here. This is | 1785 # Compute the change made since the last time we updated here. This is |
1737 # calculated by simply subtracting the last known value from the current | 1786 # calculated by simply subtracting the last known value from the current |
1738 # value in the class instance. | 1787 # value in the class instance. |
1739 delta = getattr(cls, name) - last_value | 1788 delta = getattr(cls, name) - last_value |
1740 self.last_shared_var_values[key] = delta + last_value | 1789 self.last_shared_var_values[key] = delta + last_value |
1741 | 1790 |
1742 # Update the globally-consistent value by simply increasing it by the | 1791 # Update the globally-consistent value by simply increasing it by the |
1743 # computed delta. | 1792 # computed delta. |
1744 shared_vars_map.Update(key, delta) | 1793 shared_vars_map.Increment(key, delta) |
1745 | 1794 |
1746 | 1795 |
1747 def _NotifyIfDone(caller_id, num_done): | 1796 def _NotifyIfDone(caller_id, num_done): |
1748 """Notify any threads waiting for results that something has finished. | 1797 """Notify any threads waiting for results that something has finished. |
1749 | 1798 |
1750 Each waiting thread will then need to check the call_completed_map to see if | 1799 Each waiting thread will then need to check the call_completed_map to see if |
1751 its work is done. | 1800 its work is done. |
1752 | 1801 |
1753 Note that num_done could be calculated here, but it is passed in as an | 1802 Note that num_done could be calculated here, but it is passed in as an |
1754 optimization so that we have one less call to a globally-locked data | 1803 optimization so that we have one less call to a globally-locked data |
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1801 def ResetFailureCount(): | 1850 def ResetFailureCount(): |
1802 """Resets the failure_count variable to 0 - useful if error is expected.""" | 1851 """Resets the failure_count variable to 0 - useful if error is expected.""" |
1803 try: | 1852 try: |
1804 global failure_count | 1853 global failure_count |
1805 if isinstance(failure_count, int): | 1854 if isinstance(failure_count, int): |
1806 failure_count = 0 | 1855 failure_count = 0 |
1807 else: # It's a multiprocessing.Value() of type 'i'. | 1856 else: # It's a multiprocessing.Value() of type 'i'. |
1808 failure_count = multiprocessing.Value('i', 0) | 1857 failure_count = multiprocessing.Value('i', 0) |
1809 except NameError: # If it wasn't initialized, Apply() wasn't called. | 1858 except NameError: # If it wasn't initialized, Apply() wasn't called. |
1810 pass | 1859 pass |
OLD | NEW |