OLD | NEW |
(Empty) | |
| 1 # -*- coding: utf-8 -*- |
| 2 # Copyright 2010 Google Inc. All Rights Reserved. |
| 3 # |
| 4 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 # you may not use this file except in compliance with the License. |
| 6 # You may obtain a copy of the License at |
| 7 # |
| 8 # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 # |
| 10 # Unless required by applicable law or agreed to in writing, software |
| 11 # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 # See the License for the specific language governing permissions and |
| 14 # limitations under the License. |
| 15 """Base class for gsutil commands. |
| 16 |
| 17 In addition to base class code, this file contains helpers that depend on base |
| 18 class state (such as GetAndPrintAcl) In general, functions that depend on |
| 19 class state and that are used by multiple commands belong in this file. |
| 20 Functions that don't depend on class state belong in util.py, and non-shared |
| 21 helpers belong in individual subclasses. |
| 22 """ |
| 23 |
| 24 from __future__ import absolute_import |
| 25 |
| 26 import codecs |
| 27 from collections import namedtuple |
| 28 import copy |
| 29 import getopt |
| 30 import logging |
| 31 import multiprocessing |
| 32 import os |
| 33 import Queue |
| 34 import signal |
| 35 import sys |
| 36 import textwrap |
| 37 import threading |
| 38 import traceback |
| 39 |
| 40 import boto |
| 41 from boto.storage_uri import StorageUri |
| 42 import gslib |
| 43 from gslib.cloud_api import AccessDeniedException |
| 44 from gslib.cloud_api import ArgumentException |
| 45 from gslib.cloud_api import ServiceException |
| 46 from gslib.cloud_api_delegator import CloudApiDelegator |
| 47 from gslib.cs_api_map import ApiSelector |
| 48 from gslib.cs_api_map import GsutilApiMapFactory |
| 49 from gslib.exception import CommandException |
| 50 from gslib.help_provider import HelpProvider |
| 51 from gslib.name_expansion import NameExpansionIterator |
| 52 from gslib.name_expansion import NameExpansionResult |
| 53 from gslib.parallelism_framework_util import AtomicIncrementDict |
| 54 from gslib.parallelism_framework_util import BasicIncrementDict |
| 55 from gslib.parallelism_framework_util import ThreadAndProcessSafeDict |
| 56 from gslib.plurality_checkable_iterator import PluralityCheckableIterator |
| 57 from gslib.sig_handling import RegisterSignalHandler |
| 58 from gslib.storage_url import StorageUrlFromString |
| 59 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages |
| 60 from gslib.translation_helper import AclTranslation |
| 61 from gslib.util import GetConfigFilePath |
| 62 from gslib.util import GsutilStreamHandler |
| 63 from gslib.util import HaveFileUrls |
| 64 from gslib.util import HaveProviderUrls |
| 65 from gslib.util import IS_WINDOWS |
| 66 from gslib.util import MultiprocessingIsAvailable |
| 67 from gslib.util import NO_MAX |
| 68 from gslib.util import UrlsAreForSingleProvider |
| 69 from gslib.util import UTF8 |
| 70 from gslib.wildcard_iterator import CreateWildcardIterator |
| 71 |
| 72 OFFER_GSUTIL_M_SUGGESTION_THRESHOLD = 5 |
| 73 |
| 74 if IS_WINDOWS: |
| 75 import ctypes # pylint: disable=g-import-not-at-top |
| 76 |
| 77 |
| 78 def _DefaultExceptionHandler(cls, e): |
| 79 cls.logger.exception(e) |
| 80 |
| 81 |
| 82 def CreateGsutilLogger(command_name): |
| 83 """Creates a logger that resembles 'print' output. |
| 84 |
| 85 This logger abides by gsutil -d/-D/-DD/-q options. |
| 86 |
| 87 By default (if none of the above options is specified) the logger will display |
| 88 all messages logged with level INFO or above. Log propagation is disabled. |
| 89 |
| 90 Args: |
| 91 command_name: Command name to create logger for. |
| 92 |
| 93 Returns: |
| 94 A logger object. |
| 95 """ |
| 96 log = logging.getLogger(command_name) |
| 97 log.propagate = False |
| 98 log.setLevel(logging.root.level) |
| 99 log_handler = GsutilStreamHandler() |
| 100 log_handler.setFormatter(logging.Formatter('%(message)s')) |
| 101 # Commands that call other commands (like mv) would cause log handlers to be |
| 102 # added more than once, so avoid adding if one is already present. |
| 103 if not log.handlers: |
| 104 log.addHandler(log_handler) |
| 105 return log |
| 106 |
| 107 |
| 108 def _UrlArgChecker(command_instance, url): |
| 109 if not command_instance.exclude_symlinks: |
| 110 return True |
| 111 exp_src_url = url.expanded_storage_url |
| 112 if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name): |
| 113 command_instance.logger.info('Skipping symbolic link %s...', exp_src_url) |
| 114 return False |
| 115 return True |
| 116 |
| 117 |
| 118 def DummyArgChecker(*unused_args): |
| 119 return True |
| 120 |
| 121 |
| 122 def SetAclFuncWrapper(cls, name_expansion_result, thread_state=None): |
| 123 return cls.SetAclFunc(name_expansion_result, thread_state=thread_state) |
| 124 |
| 125 |
| 126 def SetAclExceptionHandler(cls, e): |
| 127 """Exception handler that maintains state about post-completion status.""" |
| 128 cls.logger.error(str(e)) |
| 129 cls.everything_set_okay = False |
| 130 |
| 131 # We will keep this list of all thread- or process-safe queues ever created by |
| 132 # the main thread so that we can forcefully kill them upon shutdown. Otherwise, |
| 133 # we encounter a Python bug in which empty queues block forever on join (which |
| 134 # is called as part of the Python exit function cleanup) under the impression |
| 135 # that they are non-empty. |
| 136 # However, this also lets us shut down somewhat more cleanly when interrupted. |
| 137 queues = [] |
| 138 |
| 139 |
| 140 def _NewMultiprocessingQueue(): |
| 141 queue = multiprocessing.Queue(MAX_QUEUE_SIZE) |
| 142 queues.append(queue) |
| 143 return queue |
| 144 |
| 145 |
| 146 def _NewThreadsafeQueue(): |
| 147 queue = Queue.Queue(MAX_QUEUE_SIZE) |
| 148 queues.append(queue) |
| 149 return queue |
| 150 |
| 151 # The maximum size of a process- or thread-safe queue. Imposing this limit |
| 152 # prevents us from needing to hold an arbitrary amount of data in memory. |
| 153 # However, setting this number too high (e.g., >= 32768 on OS X) can cause |
| 154 # problems on some operating systems. |
| 155 MAX_QUEUE_SIZE = 32500 |
| 156 |
| 157 # That maximum depth of the tree of recursive calls to command.Apply. This is |
| 158 # an arbitrary limit put in place to prevent developers from accidentally |
| 159 # causing problems with infinite recursion, and it can be increased if needed. |
| 160 MAX_RECURSIVE_DEPTH = 5 |
| 161 |
| 162 ZERO_TASKS_TO_DO_ARGUMENT = ('There were no', 'tasks to do') |
| 163 |
| 164 # Map from deprecated aliases to the current command and subcommands that |
| 165 # provide the same behavior. |
| 166 # TODO: Remove this map and deprecate old commands on 9/9/14. |
| 167 OLD_ALIAS_MAP = {'chacl': ['acl', 'ch'], |
| 168 'getacl': ['acl', 'get'], |
| 169 'setacl': ['acl', 'set'], |
| 170 'getcors': ['cors', 'get'], |
| 171 'setcors': ['cors', 'set'], |
| 172 'chdefacl': ['defacl', 'ch'], |
| 173 'getdefacl': ['defacl', 'get'], |
| 174 'setdefacl': ['defacl', 'set'], |
| 175 'disablelogging': ['logging', 'set', 'off'], |
| 176 'enablelogging': ['logging', 'set', 'on'], |
| 177 'getlogging': ['logging', 'get'], |
| 178 'getversioning': ['versioning', 'get'], |
| 179 'setversioning': ['versioning', 'set'], |
| 180 'getwebcfg': ['web', 'get'], |
| 181 'setwebcfg': ['web', 'set']} |
| 182 |
| 183 |
| 184 # Declare all of the module level variables - see |
| 185 # InitializeMultiprocessingVariables for an explanation of why this is |
| 186 # necessary. |
| 187 # pylint: disable=global-at-module-level |
| 188 global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter |
| 189 global total_tasks, call_completed_map, global_return_values_map |
| 190 global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed |
| 191 global current_max_recursive_level, shared_vars_map, shared_vars_list_map |
| 192 global class_map, worker_checking_level_lock, failure_count |
| 193 |
| 194 |
| 195 def InitializeMultiprocessingVariables(): |
| 196 """Initializes module-level variables that will be inherited by subprocesses. |
| 197 |
| 198 On Windows, a multiprocessing.Manager object should only |
| 199 be created within an "if __name__ == '__main__':" block. This function |
| 200 must be called, otherwise every command that calls Command.Apply will fail. |
| 201 """ |
| 202 # This list of global variables must exactly match the above list of |
| 203 # declarations. |
| 204 # pylint: disable=global-variable-undefined |
| 205 global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter |
| 206 global total_tasks, call_completed_map, global_return_values_map |
| 207 global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed |
| 208 global current_max_recursive_level, shared_vars_map, shared_vars_list_map |
| 209 global class_map, worker_checking_level_lock, failure_count |
| 210 |
| 211 manager = multiprocessing.Manager() |
| 212 |
| 213 consumer_pools = [] |
| 214 |
| 215 # List of all existing task queues - used by all pools to find the queue |
| 216 # that's appropriate for the given recursive_apply_level. |
| 217 task_queues = [] |
| 218 |
| 219 # Used to assign a globally unique caller ID to each Apply call. |
| 220 caller_id_lock = manager.Lock() |
| 221 caller_id_counter = multiprocessing.Value('i', 0) |
| 222 |
| 223 # Map from caller_id to total number of tasks to be completed for that ID. |
| 224 total_tasks = ThreadAndProcessSafeDict(manager) |
| 225 |
| 226 # Map from caller_id to a boolean which is True iff all its tasks are |
| 227 # finished. |
| 228 call_completed_map = ThreadAndProcessSafeDict(manager) |
| 229 |
| 230 # Used to keep track of the set of return values for each caller ID. |
| 231 global_return_values_map = AtomicIncrementDict(manager) |
| 232 |
| 233 # Condition used to notify any waiting threads that a task has finished or |
| 234 # that a call to Apply needs a new set of consumer processes. |
| 235 need_pool_or_done_cond = manager.Condition() |
| 236 |
| 237 # Lock used to prevent multiple worker processes from asking the main thread |
| 238 # to create a new consumer pool for the same level. |
| 239 worker_checking_level_lock = manager.Lock() |
| 240 |
| 241 # Map from caller_id to the current number of completed tasks for that ID. |
| 242 caller_id_finished_count = AtomicIncrementDict(manager) |
| 243 |
| 244 # Used as a way for the main thread to distinguish between being woken up |
| 245 # by another call finishing and being woken up by a call that needs a new set |
| 246 # of consumer processes. |
| 247 new_pool_needed = multiprocessing.Value('i', 0) |
| 248 |
| 249 current_max_recursive_level = multiprocessing.Value('i', 0) |
| 250 |
| 251 # Map from (caller_id, name) to the value of that shared variable. |
| 252 shared_vars_map = AtomicIncrementDict(manager) |
| 253 shared_vars_list_map = ThreadAndProcessSafeDict(manager) |
| 254 |
| 255 # Map from caller_id to calling class. |
| 256 class_map = manager.dict() |
| 257 |
| 258 # Number of tasks that resulted in an exception in calls to Apply(). |
| 259 failure_count = multiprocessing.Value('i', 0) |
| 260 |
| 261 |
| 262 # Each subclass of Command must define a property named 'command_spec' that is |
| 263 # an instance of the following class. |
| 264 CommandSpec = namedtuple('CommandSpec', [ |
| 265 # Name of command. |
| 266 'command_name', |
| 267 # Usage synopsis. |
| 268 'usage_synopsis', |
| 269 # List of command name aliases. |
| 270 'command_name_aliases', |
| 271 # Min number of args required by this command. |
| 272 'min_args', |
| 273 # Max number of args required by this command, or NO_MAX. |
| 274 'max_args', |
| 275 # Getopt-style string specifying acceptable sub args. |
| 276 'supported_sub_args', |
| 277 # True if file URLs are acceptable for this command. |
| 278 'file_url_ok', |
| 279 # True if provider-only URLs are acceptable for this command. |
| 280 'provider_url_ok', |
| 281 # Index in args of first URL arg. |
| 282 'urls_start_arg', |
| 283 # List of supported APIs |
| 284 'gs_api_support', |
| 285 # Default API to use for this command |
| 286 'gs_default_api', |
| 287 # Private arguments (for internal testing) |
| 288 'supported_private_args', |
| 289 'argparse_arguments', |
| 290 ]) |
| 291 |
| 292 |
| 293 class Command(HelpProvider): |
| 294 """Base class for all gsutil commands.""" |
| 295 |
| 296 # Each subclass must override this with an instance of CommandSpec. |
| 297 command_spec = None |
| 298 |
| 299 _commands_with_subcommands_and_subopts = ['acl', 'defacl', 'logging', 'web', |
| 300 'notification'] |
| 301 |
| 302 # This keeps track of the recursive depth of the current call to Apply. |
| 303 recursive_apply_level = 0 |
| 304 |
| 305 # If the multiprocessing module isn't available, we'll use this to keep track |
| 306 # of the caller_id. |
| 307 sequential_caller_id = -1 |
| 308 |
| 309 @staticmethod |
| 310 def CreateCommandSpec(command_name, usage_synopsis=None, |
| 311 command_name_aliases=None, min_args=0, |
| 312 max_args=NO_MAX, supported_sub_args='', |
| 313 file_url_ok=False, provider_url_ok=False, |
| 314 urls_start_arg=0, gs_api_support=None, |
| 315 gs_default_api=None, supported_private_args=None, |
| 316 argparse_arguments=None): |
| 317 """Creates an instance of CommandSpec, with defaults.""" |
| 318 return CommandSpec( |
| 319 command_name=command_name, |
| 320 usage_synopsis=usage_synopsis, |
| 321 command_name_aliases=command_name_aliases or [], |
| 322 min_args=min_args, |
| 323 max_args=max_args, |
| 324 supported_sub_args=supported_sub_args, |
| 325 file_url_ok=file_url_ok, |
| 326 provider_url_ok=provider_url_ok, |
| 327 urls_start_arg=urls_start_arg, |
| 328 gs_api_support=gs_api_support or [ApiSelector.XML], |
| 329 gs_default_api=gs_default_api or ApiSelector.XML, |
| 330 supported_private_args=supported_private_args, |
| 331 argparse_arguments=argparse_arguments or []) |
| 332 |
| 333 # Define a convenience property for command name, since it's used many places. |
| 334 def _GetDefaultCommandName(self): |
| 335 return self.command_spec.command_name |
| 336 command_name = property(_GetDefaultCommandName) |
| 337 |
| 338 def _CalculateUrlsStartArg(self): |
| 339 """Calculate the index in args of the first URL arg. |
| 340 |
| 341 Returns: |
| 342 Index of the first URL arg (according to the command spec). |
| 343 """ |
| 344 return self.command_spec.urls_start_arg |
| 345 |
| 346 def _TranslateDeprecatedAliases(self, args): |
| 347 """Map deprecated aliases to the corresponding new command, and warn.""" |
| 348 new_command_args = OLD_ALIAS_MAP.get(self.command_alias_used, None) |
| 349 if new_command_args: |
| 350 # Prepend any subcommands for the new command. The command name itself |
| 351 # is not part of the args, so leave it out. |
| 352 args = new_command_args[1:] + args |
| 353 self.logger.warn('\n'.join(textwrap.wrap( |
| 354 ('You are using a deprecated alias, "%(used_alias)s", for the ' |
| 355 '"%(command_name)s" command. This will stop working on 9/9/2014. ' |
| 356 'Please use "%(command_name)s" with the appropriate sub-command in ' |
| 357 'the future. See "gsutil help %(command_name)s" for details.') % |
| 358 {'used_alias': self.command_alias_used, |
| 359 'command_name': self.command_name}))) |
| 360 return args |
| 361 |
| 362 def __init__(self, command_runner, args, headers, debug, parallel_operations, |
| 363 bucket_storage_uri_class, gsutil_api_class_map_factory, |
| 364 test_method=None, logging_filters=None, |
| 365 command_alias_used=None): |
| 366 """Instantiates a Command. |
| 367 |
| 368 Args: |
| 369 command_runner: CommandRunner (for commands built atop other commands). |
| 370 args: Command-line args (arg0 = actual arg, not command name ala bash). |
| 371 headers: Dictionary containing optional HTTP headers to pass to boto. |
| 372 debug: Debug level to pass in to boto connection (range 0..3). |
| 373 parallel_operations: Should command operations be executed in parallel? |
| 374 bucket_storage_uri_class: Class to instantiate for cloud StorageUris. |
| 375 Settable for testing/mocking. |
| 376 gsutil_api_class_map_factory: Creates map of cloud storage interfaces. |
| 377 Settable for testing/mocking. |
| 378 test_method: Optional general purpose method for testing purposes. |
| 379 Application and semantics of this method will vary by |
| 380 command and test type. |
| 381 logging_filters: Optional list of logging.Filters to apply to this |
| 382 command's logger. |
| 383 command_alias_used: The alias that was actually used when running this |
| 384 command (as opposed to the "official" command name, |
| 385 which will always correspond to the file name). |
| 386 |
| 387 Implementation note: subclasses shouldn't need to define an __init__ |
| 388 method, and instead depend on the shared initialization that happens |
| 389 here. If you do define an __init__ method in a subclass you'll need to |
| 390 explicitly call super().__init__(). But you're encouraged not to do this, |
| 391 because it will make changing the __init__ interface more painful. |
| 392 """ |
| 393 # Save class values from constructor params. |
| 394 self.command_runner = command_runner |
| 395 self.unparsed_args = args |
| 396 self.headers = headers |
| 397 self.debug = debug |
| 398 self.parallel_operations = parallel_operations |
| 399 self.bucket_storage_uri_class = bucket_storage_uri_class |
| 400 self.gsutil_api_class_map_factory = gsutil_api_class_map_factory |
| 401 self.test_method = test_method |
| 402 self.exclude_symlinks = False |
| 403 self.recursion_requested = False |
| 404 self.all_versions = False |
| 405 self.command_alias_used = command_alias_used |
| 406 |
| 407 # Global instance of a threaded logger object. |
| 408 self.logger = CreateGsutilLogger(self.command_name) |
| 409 if logging_filters: |
| 410 for log_filter in logging_filters: |
| 411 self.logger.addFilter(log_filter) |
| 412 |
| 413 if self.command_spec is None: |
| 414 raise CommandException('"%s" command implementation is missing a ' |
| 415 'command_spec definition.' % self.command_name) |
| 416 |
| 417 # Parse and validate args. |
| 418 self.args = self._TranslateDeprecatedAliases(args) |
| 419 self.ParseSubOpts() |
| 420 |
| 421 # Named tuple public functions start with _ |
| 422 # pylint: disable=protected-access |
| 423 self.command_spec = self.command_spec._replace( |
| 424 urls_start_arg=self._CalculateUrlsStartArg()) |
| 425 |
| 426 if (len(self.args) < self.command_spec.min_args |
| 427 or len(self.args) > self.command_spec.max_args): |
| 428 self.RaiseWrongNumberOfArgumentsException() |
| 429 |
| 430 if self.command_name not in self._commands_with_subcommands_and_subopts: |
| 431 self.CheckArguments() |
| 432 |
| 433 # Build the support and default maps from the command spec. |
| 434 support_map = { |
| 435 'gs': self.command_spec.gs_api_support, |
| 436 's3': [ApiSelector.XML] |
| 437 } |
| 438 default_map = { |
| 439 'gs': self.command_spec.gs_default_api, |
| 440 's3': ApiSelector.XML |
| 441 } |
| 442 self.gsutil_api_map = GsutilApiMapFactory.GetApiMap( |
| 443 self.gsutil_api_class_map_factory, support_map, default_map) |
| 444 |
| 445 self.project_id = None |
| 446 self.gsutil_api = CloudApiDelegator( |
| 447 bucket_storage_uri_class, self.gsutil_api_map, |
| 448 self.logger, debug=self.debug) |
| 449 |
| 450 # Cross-platform path to run gsutil binary. |
| 451 self.gsutil_cmd = '' |
| 452 # If running on Windows, invoke python interpreter explicitly. |
| 453 if gslib.util.IS_WINDOWS: |
| 454 self.gsutil_cmd += 'python ' |
| 455 # Add full path to gsutil to make sure we test the correct version. |
| 456 self.gsutil_path = gslib.GSUTIL_PATH |
| 457 self.gsutil_cmd += self.gsutil_path |
| 458 |
| 459 # We're treating recursion_requested like it's used by all commands, but |
| 460 # only some of the commands accept the -R option. |
| 461 if self.sub_opts: |
| 462 for o, unused_a in self.sub_opts: |
| 463 if o == '-r' or o == '-R': |
| 464 self.recursion_requested = True |
| 465 break |
| 466 |
| 467 self.multiprocessing_is_available = MultiprocessingIsAvailable()[0] |
| 468 |
| 469 def RaiseWrongNumberOfArgumentsException(self): |
| 470 """Raises exception for wrong number of arguments supplied to command.""" |
| 471 if len(self.args) < self.command_spec.min_args: |
| 472 tail_str = 's' if self.command_spec.min_args > 1 else '' |
| 473 message = ('The %s command requires at least %d argument%s.' % |
| 474 (self.command_name, self.command_spec.min_args, tail_str)) |
| 475 else: |
| 476 message = ('The %s command accepts at most %d arguments.' % |
| 477 (self.command_name, self.command_spec.max_args)) |
| 478 message += ' Usage:\n%s\nFor additional help run:\n gsutil help %s' % ( |
| 479 self.command_spec.usage_synopsis, self.command_name) |
| 480 raise CommandException(message) |
| 481 |
| 482 def RaiseInvalidArgumentException(self): |
| 483 """Raises exception for specifying an invalid argument to command.""" |
| 484 message = ('Incorrect option(s) specified. Usage:\n%s\n' |
| 485 'For additional help run:\n gsutil help %s' % ( |
| 486 self.command_spec.usage_synopsis, self.command_name)) |
| 487 raise CommandException(message) |
| 488 |
| 489 def ParseSubOpts(self, check_args=False): |
| 490 """Parses sub-opt args. |
| 491 |
| 492 Args: |
| 493 check_args: True to have CheckArguments() called after parsing. |
| 494 |
| 495 Populates: |
| 496 (self.sub_opts, self.args) from parsing. |
| 497 |
| 498 Raises: RaiseInvalidArgumentException if invalid args specified. |
| 499 """ |
| 500 try: |
| 501 self.sub_opts, self.args = getopt.getopt( |
| 502 self.args, self.command_spec.supported_sub_args, |
| 503 self.command_spec.supported_private_args or []) |
| 504 except getopt.GetoptError: |
| 505 self.RaiseInvalidArgumentException() |
| 506 if check_args: |
| 507 self.CheckArguments() |
| 508 |
| 509 def CheckArguments(self): |
| 510 """Checks that command line arguments match the command_spec. |
| 511 |
| 512 Any commands in self._commands_with_subcommands_and_subopts are responsible |
| 513 for calling this method after handling initial parsing of their arguments. |
| 514 This prevents commands with sub-commands as well as options from breaking |
| 515 the parsing of getopt. |
| 516 |
| 517 TODO: Provide a function to parse commands and sub-commands more |
| 518 intelligently once we stop allowing the deprecated command versions. |
| 519 |
| 520 Raises: |
| 521 CommandException if the arguments don't match. |
| 522 """ |
| 523 |
| 524 if (not self.command_spec.file_url_ok |
| 525 and HaveFileUrls(self.args[self.command_spec.urls_start_arg:])): |
| 526 raise CommandException('"%s" command does not support "file://" URLs. ' |
| 527 'Did you mean to use a gs:// URL?' % |
| 528 self.command_name) |
| 529 if (not self.command_spec.provider_url_ok |
| 530 and HaveProviderUrls(self.args[self.command_spec.urls_start_arg:])): |
| 531 raise CommandException('"%s" command does not support provider-only ' |
| 532 'URLs.' % self.command_name) |
| 533 |
| 534 def WildcardIterator(self, url_string, all_versions=False): |
| 535 """Helper to instantiate gslib.WildcardIterator. |
| 536 |
| 537 Args are same as gslib.WildcardIterator interface, but this method fills in |
| 538 most of the values from instance state. |
| 539 |
| 540 Args: |
| 541 url_string: URL string naming wildcard objects to iterate. |
| 542 all_versions: If true, the iterator yields all versions of objects |
| 543 matching the wildcard. If false, yields just the live |
| 544 object version. |
| 545 |
| 546 Returns: |
| 547 WildcardIterator for use by caller. |
| 548 """ |
| 549 return CreateWildcardIterator( |
| 550 url_string, self.gsutil_api, all_versions=all_versions, |
| 551 debug=self.debug, project_id=self.project_id) |
| 552 |
| 553 def RunCommand(self): |
| 554 """Abstract function in base class. Subclasses must implement this. |
| 555 |
| 556 The return value of this function will be used as the exit status of the |
| 557 process, so subclass commands should return an integer exit code (0 for |
| 558 success, a value in [1,255] for failure). |
| 559 """ |
| 560 raise CommandException('Command %s is missing its RunCommand() ' |
| 561 'implementation' % self.command_name) |
| 562 |
| 563 ############################################################ |
| 564 # Shared helper functions that depend on base class state. # |
| 565 ############################################################ |
| 566 |
| 567 def ApplyAclFunc(self, acl_func, acl_excep_handler, url_strs): |
| 568 """Sets the standard or default object ACL depending on self.command_name. |
| 569 |
| 570 Args: |
| 571 acl_func: ACL function to be passed to Apply. |
| 572 acl_excep_handler: ACL exception handler to be passed to Apply. |
| 573 url_strs: URL strings on which to set ACL. |
| 574 |
| 575 Raises: |
| 576 CommandException if an ACL could not be set. |
| 577 """ |
| 578 multi_threaded_url_args = [] |
| 579 # Handle bucket ACL setting operations single-threaded, because |
| 580 # our threading machinery currently assumes it's working with objects |
| 581 # (name_expansion_iterator), and normally we wouldn't expect users to need |
| 582 # to set ACLs on huge numbers of buckets at once anyway. |
| 583 for url_str in url_strs: |
| 584 url = StorageUrlFromString(url_str) |
| 585 if url.IsCloudUrl() and url.IsBucket(): |
| 586 if self.recursion_requested: |
| 587 # If user specified -R option, convert any bucket args to bucket |
| 588 # wildcards (e.g., gs://bucket/*), to prevent the operation from |
| 589 # being applied to the buckets themselves. |
| 590 url.object_name = '*' |
| 591 multi_threaded_url_args.append(url.url_string) |
| 592 else: |
| 593 # Convert to a NameExpansionResult so we can re-use the threaded |
| 594 # function for the single-threaded implementation. RefType is unused. |
| 595 for blr in self.WildcardIterator(url.url_string).IterBuckets( |
| 596 bucket_fields=['id']): |
| 597 name_expansion_for_url = NameExpansionResult( |
| 598 url, False, False, blr.storage_url) |
| 599 acl_func(self, name_expansion_for_url) |
| 600 else: |
| 601 multi_threaded_url_args.append(url_str) |
| 602 |
| 603 if len(multi_threaded_url_args) >= 1: |
| 604 name_expansion_iterator = NameExpansionIterator( |
| 605 self.command_name, self.debug, |
| 606 self.logger, self.gsutil_api, |
| 607 multi_threaded_url_args, self.recursion_requested, |
| 608 all_versions=self.all_versions, |
| 609 continue_on_error=self.continue_on_error or self.parallel_operations) |
| 610 |
| 611 # Perform requests in parallel (-m) mode, if requested, using |
| 612 # configured number of parallel processes and threads. Otherwise, |
| 613 # perform requests with sequential function calls in current process. |
| 614 self.Apply(acl_func, name_expansion_iterator, acl_excep_handler, |
| 615 fail_on_error=not self.continue_on_error) |
| 616 |
| 617 if not self.everything_set_okay and not self.continue_on_error: |
| 618 raise CommandException('ACLs for some objects could not be set.') |
| 619 |
| 620 def SetAclFunc(self, name_expansion_result, thread_state=None): |
| 621 """Sets the object ACL for the name_expansion_result provided. |
| 622 |
| 623 Args: |
| 624 name_expansion_result: NameExpansionResult describing the target object. |
| 625 thread_state: If present, use this gsutil Cloud API instance for the set. |
| 626 """ |
| 627 if thread_state: |
| 628 assert not self.def_acl |
| 629 gsutil_api = thread_state |
| 630 else: |
| 631 gsutil_api = self.gsutil_api |
| 632 op_string = 'default object ACL' if self.def_acl else 'ACL' |
| 633 url = name_expansion_result.expanded_storage_url |
| 634 self.logger.info('Setting %s on %s...', op_string, url) |
| 635 if (gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML |
| 636 and url.scheme != 'gs'): |
| 637 # If we are called with a non-google ACL model, we need to use the XML |
| 638 # passthrough. acl_arg should either be a canned ACL or an XML ACL. |
| 639 self._SetAclXmlPassthrough(url, gsutil_api) |
| 640 else: |
| 641 # Normal Cloud API path. acl_arg is a JSON ACL or a canned ACL. |
| 642 self._SetAclGsutilApi(url, gsutil_api) |
| 643 |
| 644 def _SetAclXmlPassthrough(self, url, gsutil_api): |
| 645 """Sets the ACL for the URL provided using the XML passthrough functions. |
| 646 |
| 647 This function assumes that self.def_acl, self.canned, |
| 648 and self.continue_on_error are initialized, and that self.acl_arg is |
| 649 either an XML string or a canned ACL string. |
| 650 |
| 651 Args: |
| 652 url: CloudURL to set the ACL on. |
| 653 gsutil_api: gsutil Cloud API to use for the ACL set. Must support XML |
| 654 passthrough functions. |
| 655 """ |
| 656 try: |
| 657 orig_prefer_api = gsutil_api.prefer_api |
| 658 gsutil_api.prefer_api = ApiSelector.XML |
| 659 gsutil_api.XmlPassThroughSetAcl( |
| 660 self.acl_arg, url, canned=self.canned, |
| 661 def_obj_acl=self.def_acl, provider=url.scheme) |
| 662 except ServiceException as e: |
| 663 if self.continue_on_error: |
| 664 self.everything_set_okay = False |
| 665 self.logger.error(e) |
| 666 else: |
| 667 raise |
| 668 finally: |
| 669 gsutil_api.prefer_api = orig_prefer_api |
| 670 |
| 671 def _SetAclGsutilApi(self, url, gsutil_api): |
| 672 """Sets the ACL for the URL provided using the gsutil Cloud API. |
| 673 |
| 674 This function assumes that self.def_acl, self.canned, |
| 675 and self.continue_on_error are initialized, and that self.acl_arg is |
| 676 either a JSON string or a canned ACL string. |
| 677 |
| 678 Args: |
| 679 url: CloudURL to set the ACL on. |
| 680 gsutil_api: gsutil Cloud API to use for the ACL set. |
| 681 """ |
| 682 try: |
| 683 if url.IsBucket(): |
| 684 if self.def_acl: |
| 685 if self.canned: |
| 686 gsutil_api.PatchBucket( |
| 687 url.bucket_name, apitools_messages.Bucket(), |
| 688 canned_def_acl=self.acl_arg, provider=url.scheme, fields=['id']) |
| 689 else: |
| 690 def_obj_acl = AclTranslation.JsonToMessage( |
| 691 self.acl_arg, apitools_messages.ObjectAccessControl) |
| 692 bucket_metadata = apitools_messages.Bucket( |
| 693 defaultObjectAcl=def_obj_acl) |
| 694 gsutil_api.PatchBucket(url.bucket_name, bucket_metadata, |
| 695 provider=url.scheme, fields=['id']) |
| 696 else: |
| 697 if self.canned: |
| 698 gsutil_api.PatchBucket( |
| 699 url.bucket_name, apitools_messages.Bucket(), |
| 700 canned_acl=self.acl_arg, provider=url.scheme, fields=['id']) |
| 701 else: |
| 702 bucket_acl = AclTranslation.JsonToMessage( |
| 703 self.acl_arg, apitools_messages.BucketAccessControl) |
| 704 bucket_metadata = apitools_messages.Bucket(acl=bucket_acl) |
| 705 gsutil_api.PatchBucket(url.bucket_name, bucket_metadata, |
| 706 provider=url.scheme, fields=['id']) |
| 707 else: # url.IsObject() |
| 708 if self.canned: |
| 709 gsutil_api.PatchObjectMetadata( |
| 710 url.bucket_name, url.object_name, apitools_messages.Object(), |
| 711 provider=url.scheme, generation=url.generation, |
| 712 canned_acl=self.acl_arg) |
| 713 else: |
| 714 object_acl = AclTranslation.JsonToMessage( |
| 715 self.acl_arg, apitools_messages.ObjectAccessControl) |
| 716 object_metadata = apitools_messages.Object(acl=object_acl) |
| 717 gsutil_api.PatchObjectMetadata(url.bucket_name, url.object_name, |
| 718 object_metadata, provider=url.scheme, |
| 719 generation=url.generation) |
| 720 except ArgumentException, e: |
| 721 raise |
| 722 except ServiceException, e: |
| 723 if self.continue_on_error: |
| 724 self.everything_set_okay = False |
| 725 self.logger.error(e) |
| 726 else: |
| 727 raise |
| 728 |
| 729 def SetAclCommandHelper(self, acl_func, acl_excep_handler): |
| 730 """Sets ACLs on the self.args using the passed-in acl function. |
| 731 |
| 732 Args: |
| 733 acl_func: ACL function to be passed to Apply. |
| 734 acl_excep_handler: ACL exception handler to be passed to Apply. |
| 735 """ |
| 736 acl_arg = self.args[0] |
| 737 url_args = self.args[1:] |
| 738 # Disallow multi-provider setacl requests, because there are differences in |
| 739 # the ACL models. |
| 740 if not UrlsAreForSingleProvider(url_args): |
| 741 raise CommandException('"%s" command spanning providers not allowed.' % |
| 742 self.command_name) |
| 743 |
| 744 # Determine whether acl_arg names a file containing XML ACL text vs. the |
| 745 # string name of a canned ACL. |
| 746 if os.path.isfile(acl_arg): |
| 747 with codecs.open(acl_arg, 'r', UTF8) as f: |
| 748 acl_arg = f.read() |
| 749 self.canned = False |
| 750 else: |
| 751 # No file exists, so expect a canned ACL string. |
| 752 # Canned ACLs are not supported in JSON and we need to use the XML API |
| 753 # to set them. |
| 754 # validate=False because we allow wildcard urls. |
| 755 storage_uri = boto.storage_uri( |
| 756 url_args[0], debug=self.debug, validate=False, |
| 757 bucket_storage_uri_class=self.bucket_storage_uri_class) |
| 758 |
| 759 canned_acls = storage_uri.canned_acls() |
| 760 if acl_arg not in canned_acls: |
| 761 raise CommandException('Invalid canned ACL "%s".' % acl_arg) |
| 762 self.canned = True |
| 763 |
| 764 # Used to track if any ACLs failed to be set. |
| 765 self.everything_set_okay = True |
| 766 self.acl_arg = acl_arg |
| 767 |
| 768 self.ApplyAclFunc(acl_func, acl_excep_handler, url_args) |
| 769 if not self.everything_set_okay and not self.continue_on_error: |
| 770 raise CommandException('ACLs for some objects could not be set.') |
| 771 |
| 772 def _WarnServiceAccounts(self): |
| 773 """Warns service account users who have received an AccessDenied error. |
| 774 |
| 775 When one of the metadata-related commands fails due to AccessDenied, user |
| 776 must ensure that they are listed as an Owner in the API console. |
| 777 """ |
| 778 # Import this here so that the value will be set first in |
| 779 # gcs_oauth2_boto_plugin. |
| 780 # pylint: disable=g-import-not-at-top |
| 781 from gcs_oauth2_boto_plugin.oauth2_plugin import IS_SERVICE_ACCOUNT |
| 782 |
| 783 if IS_SERVICE_ACCOUNT: |
| 784 # This method is only called when canned ACLs are used, so the warning |
| 785 # definitely applies. |
| 786 self.logger.warning('\n'.join(textwrap.wrap( |
| 787 'It appears that your service account has been denied access while ' |
| 788 'attempting to perform a metadata operation. If you believe that you ' |
| 789 'should have access to this metadata (i.e., if it is associated with ' |
| 790 'your account), please make sure that your service account''s email ' |
| 791 'address is listed as an Owner in the Team tab of the API console. ' |
| 792 'See "gsutil help creds" for further information.\n'))) |
| 793 |
| 794 def GetAndPrintAcl(self, url_str): |
| 795 """Prints the standard or default object ACL depending on self.command_name. |
| 796 |
| 797 Args: |
| 798 url_str: URL string to get ACL for. |
| 799 """ |
| 800 blr = self.GetAclCommandBucketListingReference(url_str) |
| 801 url = StorageUrlFromString(url_str) |
| 802 if (self.gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML |
| 803 and url.scheme != 'gs'): |
| 804 # Need to use XML passthrough. |
| 805 try: |
| 806 acl = self.gsutil_api.XmlPassThroughGetAcl( |
| 807 url, def_obj_acl=self.def_acl, provider=url.scheme) |
| 808 print acl.to_xml() |
| 809 except AccessDeniedException, _: |
| 810 self._WarnServiceAccounts() |
| 811 raise |
| 812 else: |
| 813 if self.command_name == 'defacl': |
| 814 acl = blr.root_object.defaultObjectAcl |
| 815 if not acl: |
| 816 self.logger.warn( |
| 817 'No default object ACL present for %s. This could occur if ' |
| 818 'the default object ACL is private, in which case objects ' |
| 819 'created in this bucket will be readable only by their ' |
| 820 'creators. It could also mean you do not have OWNER permission ' |
| 821 'on %s and therefore do not have permission to read the ' |
| 822 'default object ACL.', url_str, url_str) |
| 823 else: |
| 824 acl = blr.root_object.acl |
| 825 if not acl: |
| 826 self._WarnServiceAccounts() |
| 827 raise AccessDeniedException('Access denied. Please ensure you have ' |
| 828 'OWNER permission on %s.' % url_str) |
| 829 print AclTranslation.JsonFromMessage(acl) |
| 830 |
| 831 def GetAclCommandBucketListingReference(self, url_str): |
| 832 """Gets a single bucket listing reference for an acl get command. |
| 833 |
| 834 Args: |
| 835 url_str: URL string to get the bucket listing reference for. |
| 836 |
| 837 Returns: |
| 838 BucketListingReference for the URL string. |
| 839 |
| 840 Raises: |
| 841 CommandException if string did not result in exactly one reference. |
| 842 """ |
| 843 # We're guaranteed by caller that we have the appropriate type of url |
| 844 # string for the call (ex. we will never be called with an object string |
| 845 # by getdefacl) |
| 846 wildcard_url = StorageUrlFromString(url_str) |
| 847 if wildcard_url.IsObject(): |
| 848 plurality_iter = PluralityCheckableIterator( |
| 849 self.WildcardIterator(url_str).IterObjects( |
| 850 bucket_listing_fields=['acl'])) |
| 851 else: |
| 852 # Bucket or provider. We call IterBuckets explicitly here to ensure that |
| 853 # the root object is populated with the acl. |
| 854 if self.command_name == 'defacl': |
| 855 bucket_fields = ['defaultObjectAcl'] |
| 856 else: |
| 857 bucket_fields = ['acl'] |
| 858 plurality_iter = PluralityCheckableIterator( |
| 859 self.WildcardIterator(url_str).IterBuckets( |
| 860 bucket_fields=bucket_fields)) |
| 861 if plurality_iter.IsEmpty(): |
| 862 raise CommandException('No URLs matched') |
| 863 if plurality_iter.HasPlurality(): |
| 864 raise CommandException( |
| 865 '%s matched more than one URL, which is not allowed by the %s ' |
| 866 'command' % (url_str, self.command_name)) |
| 867 return list(plurality_iter)[0] |
| 868 |
| 869 def _HandleMultiProcessingSigs(self, unused_signal_num, |
| 870 unused_cur_stack_frame): |
| 871 """Handles signals INT AND TERM during a multi-process/multi-thread request. |
| 872 |
| 873 Kills subprocesses. |
| 874 |
| 875 Args: |
| 876 unused_signal_num: signal generated by ^C. |
| 877 unused_cur_stack_frame: Current stack frame. |
| 878 """ |
| 879 # Note: This only works under Linux/MacOS. See |
| 880 # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details |
| 881 # about why making it work correctly across OS's is harder and still open. |
| 882 ShutDownGsutil() |
| 883 sys.stderr.write('Caught ^C - exiting\n') |
| 884 # Simply calling sys.exit(1) doesn't work - see above bug for details. |
| 885 KillProcess(os.getpid()) |
| 886 |
| 887 def GetSingleBucketUrlFromArg(self, arg, bucket_fields=None): |
| 888 """Gets a single bucket URL based on the command arguments. |
| 889 |
| 890 Args: |
| 891 arg: String argument to get bucket URL for. |
| 892 bucket_fields: Fields to populate for the bucket. |
| 893 |
| 894 Returns: |
| 895 (StorageUrl referring to a single bucket, Bucket metadata). |
| 896 |
| 897 Raises: |
| 898 CommandException if args did not match exactly one bucket. |
| 899 """ |
| 900 plurality_checkable_iterator = self.GetBucketUrlIterFromArg( |
| 901 arg, bucket_fields=bucket_fields) |
| 902 if plurality_checkable_iterator.HasPlurality(): |
| 903 raise CommandException( |
| 904 '%s matched more than one URL, which is not\n' |
| 905 'allowed by the %s command' % (arg, self.command_name)) |
| 906 blr = list(plurality_checkable_iterator)[0] |
| 907 return StorageUrlFromString(blr.url_string), blr.root_object |
| 908 |
| 909 def GetBucketUrlIterFromArg(self, arg, bucket_fields=None): |
| 910 """Gets a single bucket URL based on the command arguments. |
| 911 |
| 912 Args: |
| 913 arg: String argument to iterate over. |
| 914 bucket_fields: Fields to populate for the bucket. |
| 915 |
| 916 Returns: |
| 917 PluralityCheckableIterator over buckets. |
| 918 |
| 919 Raises: |
| 920 CommandException if iterator matched no buckets. |
| 921 """ |
| 922 arg_url = StorageUrlFromString(arg) |
| 923 if not arg_url.IsCloudUrl() or arg_url.IsObject(): |
| 924 raise CommandException('"%s" command must specify a bucket' % |
| 925 self.command_name) |
| 926 |
| 927 plurality_checkable_iterator = PluralityCheckableIterator( |
| 928 self.WildcardIterator(arg).IterBuckets( |
| 929 bucket_fields=bucket_fields)) |
| 930 if plurality_checkable_iterator.IsEmpty(): |
| 931 raise CommandException('No URLs matched') |
| 932 return plurality_checkable_iterator |
| 933 |
| 934 ###################### |
| 935 # Private functions. # |
| 936 ###################### |
| 937 |
| 938 def _ResetConnectionPool(self): |
| 939 # Each OS process needs to establish its own set of connections to |
| 940 # the server to avoid writes from different OS processes interleaving |
| 941 # onto the same socket (and garbling the underlying SSL session). |
| 942 # We ensure each process gets its own set of connections here by |
| 943 # closing all connections in the storage provider connection pool. |
| 944 connection_pool = StorageUri.provider_pool |
| 945 if connection_pool: |
| 946 for i in connection_pool: |
| 947 connection_pool[i].connection.close() |
| 948 |
| 949 def _GetProcessAndThreadCount(self, process_count, thread_count, |
| 950 parallel_operations_override): |
| 951 """Determines the values of process_count and thread_count. |
| 952 |
| 953 These values are used for parallel operations. |
| 954 If we're not performing operations in parallel, then ignore |
| 955 existing values and use process_count = thread_count = 1. |
| 956 |
| 957 Args: |
| 958 process_count: A positive integer or None. In the latter case, we read |
| 959 the value from the .boto config file. |
| 960 thread_count: A positive integer or None. In the latter case, we read |
| 961 the value from the .boto config file. |
| 962 parallel_operations_override: Used to override self.parallel_operations. |
| 963 This allows the caller to safely override |
| 964 the top-level flag for a single call. |
| 965 |
| 966 Returns: |
| 967 (process_count, thread_count): The number of processes and threads to use, |
| 968 respectively. |
| 969 """ |
| 970 # Set OS process and python thread count as a function of options |
| 971 # and config. |
| 972 if self.parallel_operations or parallel_operations_override: |
| 973 if not process_count: |
| 974 process_count = boto.config.getint( |
| 975 'GSUtil', 'parallel_process_count', |
| 976 gslib.commands.config.DEFAULT_PARALLEL_PROCESS_COUNT) |
| 977 if process_count < 1: |
| 978 raise CommandException('Invalid parallel_process_count "%d".' % |
| 979 process_count) |
| 980 if not thread_count: |
| 981 thread_count = boto.config.getint( |
| 982 'GSUtil', 'parallel_thread_count', |
| 983 gslib.commands.config.DEFAULT_PARALLEL_THREAD_COUNT) |
| 984 if thread_count < 1: |
| 985 raise CommandException('Invalid parallel_thread_count "%d".' % |
| 986 thread_count) |
| 987 else: |
| 988 # If -m not specified, then assume 1 OS process and 1 Python thread. |
| 989 process_count = 1 |
| 990 thread_count = 1 |
| 991 |
| 992 if IS_WINDOWS and process_count > 1: |
| 993 raise CommandException('\n'.join(textwrap.wrap( |
| 994 ('It is not possible to set process_count > 1 on Windows. Please ' |
| 995 'update your config file (located at %s) and set ' |
| 996 '"parallel_process_count = 1".') % |
| 997 GetConfigFilePath()))) |
| 998 self.logger.debug('process count: %d', process_count) |
| 999 self.logger.debug('thread count: %d', thread_count) |
| 1000 |
| 1001 return (process_count, thread_count) |
| 1002 |
| 1003 def _SetUpPerCallerState(self): |
| 1004 """Set up the state for a caller id, corresponding to one Apply call.""" |
| 1005 # Get a new caller ID. |
| 1006 with caller_id_lock: |
| 1007 caller_id_counter.value += 1 |
| 1008 caller_id = caller_id_counter.value |
| 1009 |
| 1010 # Create a copy of self with an incremented recursive level. This allows |
| 1011 # the class to report its level correctly if the function called from it |
| 1012 # also needs to call Apply. |
| 1013 cls = copy.copy(self) |
| 1014 cls.recursive_apply_level += 1 |
| 1015 |
| 1016 # Thread-safe loggers can't be pickled, so we will remove it here and |
| 1017 # recreate it later in the WorkerThread. This is not a problem since any |
| 1018 # logger with the same name will be treated as a singleton. |
| 1019 cls.logger = None |
| 1020 |
| 1021 # Likewise, the default API connection can't be pickled, but it is unused |
| 1022 # anyway as each thread gets its own API delegator. |
| 1023 cls.gsutil_api = None |
| 1024 |
| 1025 class_map[caller_id] = cls |
| 1026 total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet. |
| 1027 call_completed_map[caller_id] = False |
| 1028 caller_id_finished_count.Put(caller_id, 0) |
| 1029 global_return_values_map.Put(caller_id, []) |
| 1030 return caller_id |
| 1031 |
| 1032 def _CreateNewConsumerPool(self, num_processes, num_threads): |
| 1033 """Create a new pool of processes that call _ApplyThreads.""" |
| 1034 processes = [] |
| 1035 task_queue = _NewMultiprocessingQueue() |
| 1036 task_queues.append(task_queue) |
| 1037 |
| 1038 current_max_recursive_level.value += 1 |
| 1039 if current_max_recursive_level.value > MAX_RECURSIVE_DEPTH: |
| 1040 raise CommandException('Recursion depth of Apply calls is too great.') |
| 1041 for _ in range(num_processes): |
| 1042 recursive_apply_level = len(consumer_pools) |
| 1043 p = multiprocessing.Process( |
| 1044 target=self._ApplyThreads, |
| 1045 args=(num_threads, num_processes, recursive_apply_level)) |
| 1046 p.daemon = True |
| 1047 processes.append(p) |
| 1048 p.start() |
| 1049 consumer_pool = _ConsumerPool(processes, task_queue) |
| 1050 consumer_pools.append(consumer_pool) |
| 1051 |
| 1052 def Apply(self, func, args_iterator, exception_handler, |
| 1053 shared_attrs=None, arg_checker=_UrlArgChecker, |
| 1054 parallel_operations_override=False, process_count=None, |
| 1055 thread_count=None, should_return_results=False, |
| 1056 fail_on_error=False): |
| 1057 """Calls _Parallel/SequentialApply based on multiprocessing availability. |
| 1058 |
| 1059 Args: |
| 1060 func: Function to call to process each argument. |
| 1061 args_iterator: Iterable collection of arguments to be put into the |
| 1062 work queue. |
| 1063 exception_handler: Exception handler for WorkerThread class. |
| 1064 shared_attrs: List of attributes to manage across sub-processes. |
| 1065 arg_checker: Used to determine whether we should process the current |
| 1066 argument or simply skip it. Also handles any logging that |
| 1067 is specific to a particular type of argument. |
| 1068 parallel_operations_override: Used to override self.parallel_operations. |
| 1069 This allows the caller to safely override |
| 1070 the top-level flag for a single call. |
| 1071 process_count: The number of processes to use. If not specified, then |
| 1072 the configured default will be used. |
| 1073 thread_count: The number of threads per process. If not speficied, then |
| 1074 the configured default will be used.. |
| 1075 should_return_results: If true, then return the results of all successful |
| 1076 calls to func in a list. |
| 1077 fail_on_error: If true, then raise any exceptions encountered when |
| 1078 executing func. This is only applicable in the case of |
| 1079 process_count == thread_count == 1. |
| 1080 |
| 1081 Returns: |
| 1082 Results from spawned threads. |
| 1083 """ |
| 1084 if shared_attrs: |
| 1085 original_shared_vars_values = {} # We'll add these back in at the end. |
| 1086 for name in shared_attrs: |
| 1087 original_shared_vars_values[name] = getattr(self, name) |
| 1088 # By setting this to 0, we simplify the logic for computing deltas. |
| 1089 # We'll add it back after all of the tasks have been performed. |
| 1090 setattr(self, name, 0) |
| 1091 |
| 1092 (process_count, thread_count) = self._GetProcessAndThreadCount( |
| 1093 process_count, thread_count, parallel_operations_override) |
| 1094 |
| 1095 is_main_thread = (self.recursive_apply_level == 0 |
| 1096 and self.sequential_caller_id == -1) |
| 1097 |
| 1098 # We don't honor the fail_on_error flag in the case of multiple threads |
| 1099 # or processes. |
| 1100 fail_on_error = fail_on_error and (process_count * thread_count == 1) |
| 1101 |
| 1102 # Only check this from the first call in the main thread. Apart from the |
| 1103 # fact that it's wasteful to try this multiple times in general, it also |
| 1104 # will never work when called from a subprocess since we use daemon |
| 1105 # processes, and daemons can't create other processes. |
| 1106 if is_main_thread: |
| 1107 if ((not self.multiprocessing_is_available) |
| 1108 and thread_count * process_count > 1): |
| 1109 # Run the check again and log the appropriate warnings. This was run |
| 1110 # before, when the Command object was created, in order to calculate |
| 1111 # self.multiprocessing_is_available, but we don't want to print the |
| 1112 # warning until we're sure the user actually tried to use multiple |
| 1113 # threads or processes. |
| 1114 MultiprocessingIsAvailable(logger=self.logger) |
| 1115 |
| 1116 if self.multiprocessing_is_available: |
| 1117 caller_id = self._SetUpPerCallerState() |
| 1118 else: |
| 1119 self.sequential_caller_id += 1 |
| 1120 caller_id = self.sequential_caller_id |
| 1121 |
| 1122 if is_main_thread: |
| 1123 # pylint: disable=global-variable-undefined |
| 1124 global global_return_values_map, shared_vars_map, failure_count |
| 1125 global caller_id_finished_count, shared_vars_list_map |
| 1126 global_return_values_map = BasicIncrementDict() |
| 1127 global_return_values_map.Put(caller_id, []) |
| 1128 shared_vars_map = BasicIncrementDict() |
| 1129 caller_id_finished_count = BasicIncrementDict() |
| 1130 shared_vars_list_map = {} |
| 1131 failure_count = 0 |
| 1132 |
| 1133 # If any shared attributes passed by caller, create a dictionary of |
| 1134 # shared memory variables for every element in the list of shared |
| 1135 # attributes. |
| 1136 if shared_attrs: |
| 1137 shared_vars_list_map[caller_id] = shared_attrs |
| 1138 for name in shared_attrs: |
| 1139 shared_vars_map.Put((caller_id, name), 0) |
| 1140 |
| 1141 # Make all of the requested function calls. |
| 1142 if self.multiprocessing_is_available and thread_count * process_count > 1: |
| 1143 self._ParallelApply(func, args_iterator, exception_handler, caller_id, |
| 1144 arg_checker, process_count, thread_count, |
| 1145 should_return_results, fail_on_error) |
| 1146 else: |
| 1147 self._SequentialApply(func, args_iterator, exception_handler, caller_id, |
| 1148 arg_checker, should_return_results, fail_on_error) |
| 1149 |
| 1150 if shared_attrs: |
| 1151 for name in shared_attrs: |
| 1152 # This allows us to retain the original value of the shared variable, |
| 1153 # and simply apply the delta after what was done during the call to |
| 1154 # apply. |
| 1155 final_value = (original_shared_vars_values[name] + |
| 1156 shared_vars_map.Get((caller_id, name))) |
| 1157 setattr(self, name, final_value) |
| 1158 |
| 1159 if should_return_results: |
| 1160 return global_return_values_map.Get(caller_id) |
| 1161 |
| 1162 def _MaybeSuggestGsutilDashM(self): |
| 1163 """Outputs a sugestion to the user to use gsutil -m.""" |
| 1164 if not (boto.config.getint('GSUtil', 'parallel_process_count', 0) == 1 and |
| 1165 boto.config.getint('GSUtil', 'parallel_thread_count', 0) == 1): |
| 1166 self.logger.info('\n' + textwrap.fill( |
| 1167 '==> NOTE: You are performing a sequence of gsutil operations that ' |
| 1168 'may run significantly faster if you instead use gsutil -m %s ...\n' |
| 1169 'Please see the -m section under "gsutil help options" for further ' |
| 1170 'information about when gsutil -m can be advantageous.' |
| 1171 % sys.argv[1]) + '\n') |
| 1172 |
| 1173 # pylint: disable=g-doc-args |
| 1174 def _SequentialApply(self, func, args_iterator, exception_handler, caller_id, |
| 1175 arg_checker, should_return_results, fail_on_error): |
| 1176 """Performs all function calls sequentially in the current thread. |
| 1177 |
| 1178 No other threads or processes will be spawned. This degraded functionality |
| 1179 is used when the multiprocessing module is not available or the user |
| 1180 requests only one thread and one process. |
| 1181 """ |
| 1182 # Create a WorkerThread to handle all of the logic needed to actually call |
| 1183 # the function. Note that this thread will never be started, and all work |
| 1184 # is done in the current thread. |
| 1185 worker_thread = WorkerThread(None, False) |
| 1186 args_iterator = iter(args_iterator) |
| 1187 # Count of sequential calls that have been made. Used for producing |
| 1188 # suggestion to use gsutil -m. |
| 1189 sequential_call_count = 0 |
| 1190 while True: |
| 1191 |
| 1192 # Try to get the next argument, handling any exceptions that arise. |
| 1193 try: |
| 1194 args = args_iterator.next() |
| 1195 except StopIteration, e: |
| 1196 break |
| 1197 except Exception, e: # pylint: disable=broad-except |
| 1198 _IncrementFailureCount() |
| 1199 if fail_on_error: |
| 1200 raise |
| 1201 else: |
| 1202 try: |
| 1203 exception_handler(self, e) |
| 1204 except Exception, _: # pylint: disable=broad-except |
| 1205 self.logger.debug( |
| 1206 'Caught exception while handling exception for %s:\n%s', |
| 1207 func, traceback.format_exc()) |
| 1208 continue |
| 1209 |
| 1210 sequential_call_count += 1 |
| 1211 if sequential_call_count == OFFER_GSUTIL_M_SUGGESTION_THRESHOLD: |
| 1212 # Output suggestion near beginning of run, so user sees it early and can |
| 1213 # ^C and try gsutil -m. |
| 1214 self._MaybeSuggestGsutilDashM() |
| 1215 if arg_checker(self, args): |
| 1216 # Now that we actually have the next argument, perform the task. |
| 1217 task = Task(func, args, caller_id, exception_handler, |
| 1218 should_return_results, arg_checker, fail_on_error) |
| 1219 worker_thread.PerformTask(task, self) |
| 1220 if sequential_call_count >= gslib.util.GetTermLines(): |
| 1221 # Output suggestion at end of long run, in case user missed it at the |
| 1222 # start and it scrolled off-screen. |
| 1223 self._MaybeSuggestGsutilDashM() |
| 1224 |
| 1225 # pylint: disable=g-doc-args |
| 1226 def _ParallelApply(self, func, args_iterator, exception_handler, caller_id, |
| 1227 arg_checker, process_count, thread_count, |
| 1228 should_return_results, fail_on_error): |
| 1229 """Dispatches input arguments across a thread/process pool. |
| 1230 |
| 1231 Pools are composed of parallel OS processes and/or Python threads, |
| 1232 based on options (-m or not) and settings in the user's config file. |
| 1233 |
| 1234 If only one OS process is requested/available, dispatch requests across |
| 1235 threads in the current OS process. |
| 1236 |
| 1237 In the multi-process case, we will create one pool of worker processes for |
| 1238 each level of the tree of recursive calls to Apply. E.g., if A calls |
| 1239 Apply(B), and B ultimately calls Apply(C) followed by Apply(D), then we |
| 1240 will only create two sets of worker processes - B will execute in the first, |
| 1241 and C and D will execute in the second. If C is then changed to call |
| 1242 Apply(E) and D is changed to call Apply(F), then we will automatically |
| 1243 create a third set of processes (lazily, when needed) that will be used to |
| 1244 execute calls to E and F. This might look something like: |
| 1245 |
| 1246 Pool1 Executes: B |
| 1247 / \ |
| 1248 Pool2 Executes: C D |
| 1249 / \ |
| 1250 Pool3 Executes: E F |
| 1251 |
| 1252 Apply's parallelism is generally broken up into 4 cases: |
| 1253 - If process_count == thread_count == 1, then all tasks will be executed |
| 1254 by _SequentialApply. |
| 1255 - If process_count > 1 and thread_count == 1, then the main thread will |
| 1256 create a new pool of processes (if they don't already exist) and each of |
| 1257 those processes will execute the tasks in a single thread. |
| 1258 - If process_count == 1 and thread_count > 1, then this process will create |
| 1259 a new pool of threads to execute the tasks. |
| 1260 - If process_count > 1 and thread_count > 1, then the main thread will |
| 1261 create a new pool of processes (if they don't already exist) and each of |
| 1262 those processes will, upon creation, create a pool of threads to |
| 1263 execute the tasks. |
| 1264 |
| 1265 Args: |
| 1266 caller_id: The caller ID unique to this call to command.Apply. |
| 1267 See command.Apply for description of other arguments. |
| 1268 """ |
| 1269 is_main_thread = self.recursive_apply_level == 0 |
| 1270 |
| 1271 # Catch SIGINT and SIGTERM under Linux/MacOs so we can do cleanup before |
| 1272 # exiting. |
| 1273 if not IS_WINDOWS and is_main_thread: |
| 1274 # Register as a final signal handler because this handler kills the |
| 1275 # main gsutil process (so it must run last). |
| 1276 RegisterSignalHandler(signal.SIGINT, self._HandleMultiProcessingSigs, |
| 1277 is_final_handler=True) |
| 1278 RegisterSignalHandler(signal.SIGTERM, self._HandleMultiProcessingSigs, |
| 1279 is_final_handler=True) |
| 1280 |
| 1281 if not task_queues: |
| 1282 # The process we create will need to access the next recursive level |
| 1283 # of task queues if it makes a call to Apply, so we always keep around |
| 1284 # one more queue than we know we need. OTOH, if we don't create a new |
| 1285 # process, the existing process still needs a task queue to use. |
| 1286 task_queues.append(_NewMultiprocessingQueue()) |
| 1287 |
| 1288 if process_count > 1: # Handle process pool creation. |
| 1289 # Check whether this call will need a new set of workers. |
| 1290 |
| 1291 # Each worker must acquire a shared lock before notifying the main thread |
| 1292 # that it needs a new worker pool, so that at most one worker asks for |
| 1293 # a new worker pool at once. |
| 1294 try: |
| 1295 if not is_main_thread: |
| 1296 worker_checking_level_lock.acquire() |
| 1297 if self.recursive_apply_level >= current_max_recursive_level.value: |
| 1298 with need_pool_or_done_cond: |
| 1299 # Only the main thread is allowed to create new processes - |
| 1300 # otherwise, we will run into some Python bugs. |
| 1301 if is_main_thread: |
| 1302 self._CreateNewConsumerPool(process_count, thread_count) |
| 1303 else: |
| 1304 # Notify the main thread that we need a new consumer pool. |
| 1305 new_pool_needed.value = 1 |
| 1306 need_pool_or_done_cond.notify_all() |
| 1307 # The main thread will notify us when it finishes. |
| 1308 need_pool_or_done_cond.wait() |
| 1309 finally: |
| 1310 if not is_main_thread: |
| 1311 worker_checking_level_lock.release() |
| 1312 |
| 1313 # If we're running in this process, create a separate task queue. Otherwise, |
| 1314 # if Apply has already been called with process_count > 1, then there will |
| 1315 # be consumer pools trying to use our processes. |
| 1316 if process_count > 1: |
| 1317 task_queue = task_queues[self.recursive_apply_level] |
| 1318 else: |
| 1319 task_queue = _NewMultiprocessingQueue() |
| 1320 |
| 1321 # Kick off a producer thread to throw tasks in the global task queue. We |
| 1322 # do this asynchronously so that the main thread can be free to create new |
| 1323 # consumer pools when needed (otherwise, any thread with a task that needs |
| 1324 # a new consumer pool must block until we're completely done producing; in |
| 1325 # the worst case, every worker blocks on such a call and the producer fills |
| 1326 # up the task queue before it finishes, so we block forever). |
| 1327 producer_thread = ProducerThread(copy.copy(self), args_iterator, caller_id, |
| 1328 func, task_queue, should_return_results, |
| 1329 exception_handler, arg_checker, |
| 1330 fail_on_error) |
| 1331 |
| 1332 if process_count > 1: |
| 1333 # Wait here until either: |
| 1334 # 1. We're the main thread and someone needs a new consumer pool - in |
| 1335 # which case we create one and continue waiting. |
| 1336 # 2. Someone notifies us that all of the work we requested is done, in |
| 1337 # which case we retrieve the results (if applicable) and stop |
| 1338 # waiting. |
| 1339 while True: |
| 1340 with need_pool_or_done_cond: |
| 1341 # Either our call is done, or someone needs a new level of consumer |
| 1342 # pools, or we the wakeup call was meant for someone else. It's |
| 1343 # impossible for both conditions to be true, since the main thread is |
| 1344 # blocked on any other ongoing calls to Apply, and a thread would not |
| 1345 # ask for a new consumer pool unless it had more work to do. |
| 1346 if call_completed_map[caller_id]: |
| 1347 break |
| 1348 elif is_main_thread and new_pool_needed.value: |
| 1349 new_pool_needed.value = 0 |
| 1350 self._CreateNewConsumerPool(process_count, thread_count) |
| 1351 need_pool_or_done_cond.notify_all() |
| 1352 |
| 1353 # Note that we must check the above conditions before the wait() call; |
| 1354 # otherwise, the notification can happen before we start waiting, in |
| 1355 # which case we'll block forever. |
| 1356 need_pool_or_done_cond.wait() |
| 1357 else: # Using a single process. |
| 1358 self._ApplyThreads(thread_count, process_count, |
| 1359 self.recursive_apply_level, |
| 1360 is_blocking_call=True, task_queue=task_queue) |
| 1361 |
| 1362 # We encountered an exception from the producer thread before any arguments |
| 1363 # were enqueued, but it wouldn't have been propagated, so we'll now |
| 1364 # explicitly raise it here. |
| 1365 if producer_thread.unknown_exception: |
| 1366 # pylint: disable=raising-bad-type |
| 1367 raise producer_thread.unknown_exception |
| 1368 |
| 1369 # We encountered an exception from the producer thread while iterating over |
| 1370 # the arguments, so raise it here if we're meant to fail on error. |
| 1371 if producer_thread.iterator_exception and fail_on_error: |
| 1372 # pylint: disable=raising-bad-type |
| 1373 raise producer_thread.iterator_exception |
| 1374 |
| 1375 def _ApplyThreads(self, thread_count, process_count, recursive_apply_level, |
| 1376 is_blocking_call=False, task_queue=None): |
| 1377 """Assigns the work from the multi-process global task queue. |
| 1378 |
| 1379 Work is assigned to an individual process for later consumption either by |
| 1380 the WorkerThreads or (if thread_count == 1) this thread. |
| 1381 |
| 1382 Args: |
| 1383 thread_count: The number of threads used to perform the work. If 1, then |
| 1384 perform all work in this thread. |
| 1385 process_count: The number of processes used to perform the work. |
| 1386 recursive_apply_level: The depth in the tree of recursive calls to Apply |
| 1387 of this thread. |
| 1388 is_blocking_call: True iff the call to Apply is blocked on this call |
| 1389 (which is true iff process_count == 1), implying that |
| 1390 _ApplyThreads must behave as a blocking call. |
| 1391 """ |
| 1392 self._ResetConnectionPool() |
| 1393 self.recursive_apply_level = recursive_apply_level |
| 1394 |
| 1395 task_queue = task_queue or task_queues[recursive_apply_level] |
| 1396 |
| 1397 assert thread_count * process_count > 1, ( |
| 1398 'Invalid state, calling command._ApplyThreads with only one thread ' |
| 1399 'and process.') |
| 1400 worker_pool = WorkerPool( |
| 1401 thread_count, self.logger, |
| 1402 bucket_storage_uri_class=self.bucket_storage_uri_class, |
| 1403 gsutil_api_map=self.gsutil_api_map, debug=self.debug) |
| 1404 |
| 1405 num_enqueued = 0 |
| 1406 while True: |
| 1407 task = task_queue.get() |
| 1408 if task.args != ZERO_TASKS_TO_DO_ARGUMENT: |
| 1409 # If we have no tasks to do and we're performing a blocking call, we |
| 1410 # need a special signal to tell us to stop - otherwise, we block on |
| 1411 # the call to task_queue.get() forever. |
| 1412 worker_pool.AddTask(task) |
| 1413 num_enqueued += 1 |
| 1414 |
| 1415 if is_blocking_call: |
| 1416 num_to_do = total_tasks[task.caller_id] |
| 1417 # The producer thread won't enqueue the last task until after it has |
| 1418 # updated total_tasks[caller_id], so we know that num_to_do < 0 implies |
| 1419 # we will do this check again. |
| 1420 if num_to_do >= 0 and num_enqueued == num_to_do: |
| 1421 if thread_count == 1: |
| 1422 return |
| 1423 else: |
| 1424 while True: |
| 1425 with need_pool_or_done_cond: |
| 1426 if call_completed_map[task.caller_id]: |
| 1427 # We need to check this first, in case the condition was |
| 1428 # notified before we grabbed the lock. |
| 1429 return |
| 1430 need_pool_or_done_cond.wait() |
| 1431 |
| 1432 |
| 1433 # Below here lie classes and functions related to controlling the flow of tasks |
| 1434 # between various threads and processes. |
| 1435 |
| 1436 |
| 1437 class _ConsumerPool(object): |
| 1438 |
| 1439 def __init__(self, processes, task_queue): |
| 1440 self.processes = processes |
| 1441 self.task_queue = task_queue |
| 1442 |
| 1443 def ShutDown(self): |
| 1444 for process in self.processes: |
| 1445 KillProcess(process.pid) |
| 1446 |
| 1447 |
| 1448 def KillProcess(pid): |
| 1449 """Make best effort to kill the given process. |
| 1450 |
| 1451 We ignore all exceptions so a caller looping through a list of processes will |
| 1452 continue attempting to kill each, even if one encounters a problem. |
| 1453 |
| 1454 Args: |
| 1455 pid: The process ID. |
| 1456 """ |
| 1457 try: |
| 1458 # os.kill doesn't work in 2.X or 3.Y on Windows for any X < 7 or Y < 2. |
| 1459 if IS_WINDOWS and ((2, 6) <= sys.version_info[:3] < (2, 7) or |
| 1460 (3, 0) <= sys.version_info[:3] < (3, 2)): |
| 1461 kernel32 = ctypes.windll.kernel32 |
| 1462 handle = kernel32.OpenProcess(1, 0, pid) |
| 1463 kernel32.TerminateProcess(handle, 0) |
| 1464 else: |
| 1465 os.kill(pid, signal.SIGKILL) |
| 1466 except: # pylint: disable=bare-except |
| 1467 pass |
| 1468 |
| 1469 |
| 1470 class Task(namedtuple('Task', ( |
| 1471 'func args caller_id exception_handler should_return_results arg_checker ' |
| 1472 'fail_on_error'))): |
| 1473 """Task class representing work to be completed. |
| 1474 |
| 1475 Args: |
| 1476 func: The function to be executed. |
| 1477 args: The arguments to func. |
| 1478 caller_id: The globally-unique caller ID corresponding to the Apply call. |
| 1479 exception_handler: The exception handler to use if the call to func fails. |
| 1480 should_return_results: True iff the results of this function should be |
| 1481 returned from the Apply call. |
| 1482 arg_checker: Used to determine whether we should process the current |
| 1483 argument or simply skip it. Also handles any logging that |
| 1484 is specific to a particular type of argument. |
| 1485 fail_on_error: If true, then raise any exceptions encountered when |
| 1486 executing func. This is only applicable in the case of |
| 1487 process_count == thread_count == 1. |
| 1488 """ |
| 1489 pass |
| 1490 |
| 1491 |
| 1492 class ProducerThread(threading.Thread): |
| 1493 """Thread used to enqueue work for other processes and threads.""" |
| 1494 |
| 1495 def __init__(self, cls, args_iterator, caller_id, func, task_queue, |
| 1496 should_return_results, exception_handler, arg_checker, |
| 1497 fail_on_error): |
| 1498 """Initializes the producer thread. |
| 1499 |
| 1500 Args: |
| 1501 cls: Instance of Command for which this ProducerThread was created. |
| 1502 args_iterator: Iterable collection of arguments to be put into the |
| 1503 work queue. |
| 1504 caller_id: Globally-unique caller ID corresponding to this call to Apply. |
| 1505 func: The function to be called on each element of args_iterator. |
| 1506 task_queue: The queue into which tasks will be put, to later be consumed |
| 1507 by Command._ApplyThreads. |
| 1508 should_return_results: True iff the results for this call to command.Apply |
| 1509 were requested. |
| 1510 exception_handler: The exception handler to use when errors are |
| 1511 encountered during calls to func. |
| 1512 arg_checker: Used to determine whether we should process the current |
| 1513 argument or simply skip it. Also handles any logging that |
| 1514 is specific to a particular type of argument. |
| 1515 fail_on_error: If true, then raise any exceptions encountered when |
| 1516 executing func. This is only applicable in the case of |
| 1517 process_count == thread_count == 1. |
| 1518 """ |
| 1519 super(ProducerThread, self).__init__() |
| 1520 self.func = func |
| 1521 self.cls = cls |
| 1522 self.args_iterator = args_iterator |
| 1523 self.caller_id = caller_id |
| 1524 self.task_queue = task_queue |
| 1525 self.arg_checker = arg_checker |
| 1526 self.exception_handler = exception_handler |
| 1527 self.should_return_results = should_return_results |
| 1528 self.fail_on_error = fail_on_error |
| 1529 self.shared_variables_updater = _SharedVariablesUpdater() |
| 1530 self.daemon = True |
| 1531 self.unknown_exception = None |
| 1532 self.iterator_exception = None |
| 1533 self.start() |
| 1534 |
| 1535 def run(self): |
| 1536 num_tasks = 0 |
| 1537 cur_task = None |
| 1538 last_task = None |
| 1539 try: |
| 1540 args_iterator = iter(self.args_iterator) |
| 1541 while True: |
| 1542 try: |
| 1543 args = args_iterator.next() |
| 1544 except StopIteration, e: |
| 1545 break |
| 1546 except Exception, e: # pylint: disable=broad-except |
| 1547 _IncrementFailureCount() |
| 1548 if self.fail_on_error: |
| 1549 self.iterator_exception = e |
| 1550 raise |
| 1551 else: |
| 1552 try: |
| 1553 self.exception_handler(self.cls, e) |
| 1554 except Exception, _: # pylint: disable=broad-except |
| 1555 self.cls.logger.debug( |
| 1556 'Caught exception while handling exception for %s:\n%s', |
| 1557 self.func, traceback.format_exc()) |
| 1558 self.shared_variables_updater.Update(self.caller_id, self.cls) |
| 1559 continue |
| 1560 |
| 1561 if self.arg_checker(self.cls, args): |
| 1562 num_tasks += 1 |
| 1563 last_task = cur_task |
| 1564 cur_task = Task(self.func, args, self.caller_id, |
| 1565 self.exception_handler, self.should_return_results, |
| 1566 self.arg_checker, self.fail_on_error) |
| 1567 if last_task: |
| 1568 self.task_queue.put(last_task) |
| 1569 except Exception, e: # pylint: disable=broad-except |
| 1570 # This will also catch any exception raised due to an error in the |
| 1571 # iterator when fail_on_error is set, so check that we failed for some |
| 1572 # other reason before claiming that we had an unknown exception. |
| 1573 if not self.iterator_exception: |
| 1574 self.unknown_exception = e |
| 1575 finally: |
| 1576 # We need to make sure to update total_tasks[caller_id] before we enqueue |
| 1577 # the last task. Otherwise, a worker can retrieve the last task and |
| 1578 # complete it, then check total_tasks and determine that we're not done |
| 1579 # producing all before we update total_tasks. This approach forces workers |
| 1580 # to wait on the last task until after we've updated total_tasks. |
| 1581 total_tasks[self.caller_id] = num_tasks |
| 1582 if not cur_task: |
| 1583 # This happens if there were zero arguments to be put in the queue. |
| 1584 cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id, |
| 1585 None, None, None, None) |
| 1586 self.task_queue.put(cur_task) |
| 1587 |
| 1588 # It's possible that the workers finished before we updated total_tasks, |
| 1589 # so we need to check here as well. |
| 1590 _NotifyIfDone(self.caller_id, |
| 1591 caller_id_finished_count.Get(self.caller_id)) |
| 1592 |
| 1593 |
| 1594 class WorkerPool(object): |
| 1595 """Pool of worker threads to which tasks can be added.""" |
| 1596 |
| 1597 def __init__(self, thread_count, logger, bucket_storage_uri_class=None, |
| 1598 gsutil_api_map=None, debug=0): |
| 1599 self.task_queue = _NewThreadsafeQueue() |
| 1600 self.threads = [] |
| 1601 for _ in range(thread_count): |
| 1602 worker_thread = WorkerThread( |
| 1603 self.task_queue, logger, |
| 1604 bucket_storage_uri_class=bucket_storage_uri_class, |
| 1605 gsutil_api_map=gsutil_api_map, debug=debug) |
| 1606 self.threads.append(worker_thread) |
| 1607 worker_thread.start() |
| 1608 |
| 1609 def AddTask(self, task): |
| 1610 self.task_queue.put(task) |
| 1611 |
| 1612 |
| 1613 class WorkerThread(threading.Thread): |
| 1614 """Thread where all the work will be performed. |
| 1615 |
| 1616 This makes the function calls for Apply and takes care of all error handling, |
| 1617 return value propagation, and shared_vars. |
| 1618 |
| 1619 Note that this thread is NOT started upon instantiation because the function- |
| 1620 calling logic is also used in the single-threaded case. |
| 1621 """ |
| 1622 |
| 1623 def __init__(self, task_queue, logger, bucket_storage_uri_class=None, |
| 1624 gsutil_api_map=None, debug=0): |
| 1625 """Initializes the worker thread. |
| 1626 |
| 1627 Args: |
| 1628 task_queue: The thread-safe queue from which this thread should obtain |
| 1629 its work. |
| 1630 logger: Logger to use for this thread. |
| 1631 bucket_storage_uri_class: Class to instantiate for cloud StorageUris. |
| 1632 Settable for testing/mocking. |
| 1633 gsutil_api_map: Map of providers and API selector tuples to api classes |
| 1634 which can be used to communicate with those providers. |
| 1635 Used for the instantiating CloudApiDelegator class. |
| 1636 debug: debug level for the CloudApiDelegator class. |
| 1637 """ |
| 1638 super(WorkerThread, self).__init__() |
| 1639 self.task_queue = task_queue |
| 1640 self.daemon = True |
| 1641 self.cached_classes = {} |
| 1642 self.shared_vars_updater = _SharedVariablesUpdater() |
| 1643 |
| 1644 self.thread_gsutil_api = None |
| 1645 if bucket_storage_uri_class and gsutil_api_map: |
| 1646 self.thread_gsutil_api = CloudApiDelegator( |
| 1647 bucket_storage_uri_class, gsutil_api_map, logger, debug=debug) |
| 1648 |
| 1649 def PerformTask(self, task, cls): |
| 1650 """Makes the function call for a task. |
| 1651 |
| 1652 Args: |
| 1653 task: The Task to perform. |
| 1654 cls: The instance of a class which gives context to the functions called |
| 1655 by the Task's function. E.g., see SetAclFuncWrapper. |
| 1656 """ |
| 1657 caller_id = task.caller_id |
| 1658 try: |
| 1659 results = task.func(cls, task.args, thread_state=self.thread_gsutil_api) |
| 1660 if task.should_return_results: |
| 1661 global_return_values_map.Update(caller_id, [results], default_value=[]) |
| 1662 except Exception, e: # pylint: disable=broad-except |
| 1663 _IncrementFailureCount() |
| 1664 if task.fail_on_error: |
| 1665 raise # Only happens for single thread and process case. |
| 1666 else: |
| 1667 try: |
| 1668 task.exception_handler(cls, e) |
| 1669 except Exception, _: # pylint: disable=broad-except |
| 1670 # Don't allow callers to raise exceptions here and kill the worker |
| 1671 # threads. |
| 1672 cls.logger.debug( |
| 1673 'Caught exception while handling exception for %s:\n%s', |
| 1674 task, traceback.format_exc()) |
| 1675 finally: |
| 1676 self.shared_vars_updater.Update(caller_id, cls) |
| 1677 |
| 1678 # Even if we encounter an exception, we still need to claim that that |
| 1679 # the function finished executing. Otherwise, we won't know when to |
| 1680 # stop waiting and return results. |
| 1681 num_done = caller_id_finished_count.Update(caller_id, 1) |
| 1682 |
| 1683 if cls.multiprocessing_is_available: |
| 1684 _NotifyIfDone(caller_id, num_done) |
| 1685 |
| 1686 def run(self): |
| 1687 while True: |
| 1688 task = self.task_queue.get() |
| 1689 caller_id = task.caller_id |
| 1690 |
| 1691 # Get the instance of the command with the appropriate context. |
| 1692 cls = self.cached_classes.get(caller_id, None) |
| 1693 if not cls: |
| 1694 cls = copy.copy(class_map[caller_id]) |
| 1695 cls.logger = CreateGsutilLogger(cls.command_name) |
| 1696 self.cached_classes[caller_id] = cls |
| 1697 |
| 1698 self.PerformTask(task, cls) |
| 1699 |
| 1700 |
| 1701 class _SharedVariablesUpdater(object): |
| 1702 """Used to update shared variable for a class in the global map. |
| 1703 |
| 1704 Note that each thread will have its own instance of the calling class for |
| 1705 context, and it will also have its own instance of a |
| 1706 _SharedVariablesUpdater. This is used in the following way: |
| 1707 |
| 1708 1. Before any tasks are performed, each thread will get a copy of the |
| 1709 calling class, and the globally-consistent value of this shared variable |
| 1710 will be initialized to whatever it was before the call to Apply began. |
| 1711 |
| 1712 2. After each time a thread performs a task, it will look at the current |
| 1713 values of the shared variables in its instance of the calling class. |
| 1714 |
| 1715 2.A. For each such variable, it computes the delta of this variable |
| 1716 between the last known value for this class (which is stored in |
| 1717 a dict local to this class) and the current value of the variable |
| 1718 in the class. |
| 1719 |
| 1720 2.B. Using this delta, we update the last known value locally as well |
| 1721 as the globally-consistent value shared across all classes (the |
| 1722 globally consistent value is simply increased by the computed |
| 1723 delta). |
| 1724 """ |
| 1725 |
| 1726 def __init__(self): |
| 1727 self.last_shared_var_values = {} |
| 1728 |
| 1729 def Update(self, caller_id, cls): |
| 1730 """Update any shared variables with their deltas.""" |
| 1731 shared_vars = shared_vars_list_map.get(caller_id, None) |
| 1732 if shared_vars: |
| 1733 for name in shared_vars: |
| 1734 key = (caller_id, name) |
| 1735 last_value = self.last_shared_var_values.get(key, 0) |
| 1736 # Compute the change made since the last time we updated here. This is |
| 1737 # calculated by simply subtracting the last known value from the current |
| 1738 # value in the class instance. |
| 1739 delta = getattr(cls, name) - last_value |
| 1740 self.last_shared_var_values[key] = delta + last_value |
| 1741 |
| 1742 # Update the globally-consistent value by simply increasing it by the |
| 1743 # computed delta. |
| 1744 shared_vars_map.Update(key, delta) |
| 1745 |
| 1746 |
| 1747 def _NotifyIfDone(caller_id, num_done): |
| 1748 """Notify any threads waiting for results that something has finished. |
| 1749 |
| 1750 Each waiting thread will then need to check the call_completed_map to see if |
| 1751 its work is done. |
| 1752 |
| 1753 Note that num_done could be calculated here, but it is passed in as an |
| 1754 optimization so that we have one less call to a globally-locked data |
| 1755 structure. |
| 1756 |
| 1757 Args: |
| 1758 caller_id: The caller_id of the function whose progress we're checking. |
| 1759 num_done: The number of tasks currently completed for that caller_id. |
| 1760 """ |
| 1761 num_to_do = total_tasks[caller_id] |
| 1762 if num_to_do == num_done and num_to_do >= 0: |
| 1763 # Notify the Apply call that's sleeping that it's ready to return. |
| 1764 with need_pool_or_done_cond: |
| 1765 call_completed_map[caller_id] = True |
| 1766 need_pool_or_done_cond.notify_all() |
| 1767 |
| 1768 |
| 1769 def ShutDownGsutil(): |
| 1770 """Shut down all processes in consumer pools in preparation for exiting.""" |
| 1771 for q in queues: |
| 1772 try: |
| 1773 q.cancel_join_thread() |
| 1774 except: # pylint: disable=bare-except |
| 1775 pass |
| 1776 for consumer_pool in consumer_pools: |
| 1777 consumer_pool.ShutDown() |
| 1778 |
| 1779 |
| 1780 # pylint: disable=global-variable-undefined |
| 1781 def _IncrementFailureCount(): |
| 1782 global failure_count |
| 1783 if isinstance(failure_count, int): |
| 1784 failure_count += 1 |
| 1785 else: # Otherwise it's a multiprocessing.Value() of type 'i'. |
| 1786 failure_count.value += 1 |
| 1787 |
| 1788 |
| 1789 # pylint: disable=global-variable-undefined |
| 1790 def GetFailureCount(): |
| 1791 """Returns the number of failures processed during calls to Apply().""" |
| 1792 try: |
| 1793 if isinstance(failure_count, int): |
| 1794 return failure_count |
| 1795 else: # It's a multiprocessing.Value() of type 'i'. |
| 1796 return failure_count.value |
| 1797 except NameError: # If it wasn't initialized, Apply() wasn't called. |
| 1798 return 0 |
| 1799 |
| 1800 |
| 1801 def ResetFailureCount(): |
| 1802 """Resets the failure_count variable to 0 - useful if error is expected.""" |
| 1803 try: |
| 1804 global failure_count |
| 1805 if isinstance(failure_count, int): |
| 1806 failure_count = 0 |
| 1807 else: # It's a multiprocessing.Value() of type 'i'. |
| 1808 failure_count = multiprocessing.Value('i', 0) |
| 1809 except NameError: # If it wasn't initialized, Apply() wasn't called. |
| 1810 pass |
OLD | NEW |