| OLD | NEW |
| (Empty) |
| 1 # -*- coding: utf-8 -*- | |
| 2 # Copyright 2010 Google Inc. All Rights Reserved. | |
| 3 # | |
| 4 # Licensed under the Apache License, Version 2.0 (the "License"); | |
| 5 # you may not use this file except in compliance with the License. | |
| 6 # You may obtain a copy of the License at | |
| 7 # | |
| 8 # http://www.apache.org/licenses/LICENSE-2.0 | |
| 9 # | |
| 10 # Unless required by applicable law or agreed to in writing, software | |
| 11 # distributed under the License is distributed on an "AS IS" BASIS, | |
| 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 13 # See the License for the specific language governing permissions and | |
| 14 # limitations under the License. | |
| 15 """Base class for gsutil commands. | |
| 16 | |
| 17 In addition to base class code, this file contains helpers that depend on base | |
| 18 class state (such as GetAndPrintAcl) In general, functions that depend on | |
| 19 class state and that are used by multiple commands belong in this file. | |
| 20 Functions that don't depend on class state belong in util.py, and non-shared | |
| 21 helpers belong in individual subclasses. | |
| 22 """ | |
| 23 | |
| 24 from __future__ import absolute_import | |
| 25 | |
| 26 import codecs | |
| 27 from collections import namedtuple | |
| 28 import copy | |
| 29 import getopt | |
| 30 import logging | |
| 31 import multiprocessing | |
| 32 import os | |
| 33 import Queue | |
| 34 import signal | |
| 35 import sys | |
| 36 import textwrap | |
| 37 import threading | |
| 38 import traceback | |
| 39 | |
| 40 import boto | |
| 41 from boto.storage_uri import StorageUri | |
| 42 import gslib | |
| 43 from gslib.cloud_api import AccessDeniedException | |
| 44 from gslib.cloud_api import ArgumentException | |
| 45 from gslib.cloud_api import ServiceException | |
| 46 from gslib.cloud_api_delegator import CloudApiDelegator | |
| 47 from gslib.cs_api_map import ApiSelector | |
| 48 from gslib.cs_api_map import GsutilApiMapFactory | |
| 49 from gslib.exception import CommandException | |
| 50 from gslib.help_provider import HelpProvider | |
| 51 from gslib.name_expansion import NameExpansionIterator | |
| 52 from gslib.name_expansion import NameExpansionResult | |
| 53 from gslib.parallelism_framework_util import AtomicIncrementDict | |
| 54 from gslib.parallelism_framework_util import BasicIncrementDict | |
| 55 from gslib.parallelism_framework_util import ThreadAndProcessSafeDict | |
| 56 from gslib.plurality_checkable_iterator import PluralityCheckableIterator | |
| 57 from gslib.sig_handling import RegisterSignalHandler | |
| 58 from gslib.storage_url import StorageUrlFromString | |
| 59 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages | |
| 60 from gslib.translation_helper import AclTranslation | |
| 61 from gslib.util import GetConfigFilePath | |
| 62 from gslib.util import GsutilStreamHandler | |
| 63 from gslib.util import HaveFileUrls | |
| 64 from gslib.util import HaveProviderUrls | |
| 65 from gslib.util import IS_WINDOWS | |
| 66 from gslib.util import MultiprocessingIsAvailable | |
| 67 from gslib.util import NO_MAX | |
| 68 from gslib.util import UrlsAreForSingleProvider | |
| 69 from gslib.util import UTF8 | |
| 70 from gslib.wildcard_iterator import CreateWildcardIterator | |
| 71 | |
| 72 OFFER_GSUTIL_M_SUGGESTION_THRESHOLD = 5 | |
| 73 | |
| 74 if IS_WINDOWS: | |
| 75 import ctypes # pylint: disable=g-import-not-at-top | |
| 76 | |
| 77 | |
| 78 def _DefaultExceptionHandler(cls, e): | |
| 79 cls.logger.exception(e) | |
| 80 | |
| 81 | |
| 82 def CreateGsutilLogger(command_name): | |
| 83 """Creates a logger that resembles 'print' output. | |
| 84 | |
| 85 This logger abides by gsutil -d/-D/-DD/-q options. | |
| 86 | |
| 87 By default (if none of the above options is specified) the logger will display | |
| 88 all messages logged with level INFO or above. Log propagation is disabled. | |
| 89 | |
| 90 Args: | |
| 91 command_name: Command name to create logger for. | |
| 92 | |
| 93 Returns: | |
| 94 A logger object. | |
| 95 """ | |
| 96 log = logging.getLogger(command_name) | |
| 97 log.propagate = False | |
| 98 log.setLevel(logging.root.level) | |
| 99 log_handler = GsutilStreamHandler() | |
| 100 log_handler.setFormatter(logging.Formatter('%(message)s')) | |
| 101 # Commands that call other commands (like mv) would cause log handlers to be | |
| 102 # added more than once, so avoid adding if one is already present. | |
| 103 if not log.handlers: | |
| 104 log.addHandler(log_handler) | |
| 105 return log | |
| 106 | |
| 107 | |
| 108 def _UrlArgChecker(command_instance, url): | |
| 109 if not command_instance.exclude_symlinks: | |
| 110 return True | |
| 111 exp_src_url = url.expanded_storage_url | |
| 112 if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name): | |
| 113 command_instance.logger.info('Skipping symbolic link %s...', exp_src_url) | |
| 114 return False | |
| 115 return True | |
| 116 | |
| 117 | |
| 118 def DummyArgChecker(*unused_args): | |
| 119 return True | |
| 120 | |
| 121 | |
| 122 def SetAclFuncWrapper(cls, name_expansion_result, thread_state=None): | |
| 123 return cls.SetAclFunc(name_expansion_result, thread_state=thread_state) | |
| 124 | |
| 125 | |
| 126 def SetAclExceptionHandler(cls, e): | |
| 127 """Exception handler that maintains state about post-completion status.""" | |
| 128 cls.logger.error(str(e)) | |
| 129 cls.everything_set_okay = False | |
| 130 | |
| 131 # We will keep this list of all thread- or process-safe queues ever created by | |
| 132 # the main thread so that we can forcefully kill them upon shutdown. Otherwise, | |
| 133 # we encounter a Python bug in which empty queues block forever on join (which | |
| 134 # is called as part of the Python exit function cleanup) under the impression | |
| 135 # that they are non-empty. | |
| 136 # However, this also lets us shut down somewhat more cleanly when interrupted. | |
| 137 queues = [] | |
| 138 | |
| 139 | |
| 140 def _NewMultiprocessingQueue(): | |
| 141 queue = multiprocessing.Queue(MAX_QUEUE_SIZE) | |
| 142 queues.append(queue) | |
| 143 return queue | |
| 144 | |
| 145 | |
| 146 def _NewThreadsafeQueue(): | |
| 147 queue = Queue.Queue(MAX_QUEUE_SIZE) | |
| 148 queues.append(queue) | |
| 149 return queue | |
| 150 | |
| 151 # The maximum size of a process- or thread-safe queue. Imposing this limit | |
| 152 # prevents us from needing to hold an arbitrary amount of data in memory. | |
| 153 # However, setting this number too high (e.g., >= 32768 on OS X) can cause | |
| 154 # problems on some operating systems. | |
| 155 MAX_QUEUE_SIZE = 32500 | |
| 156 | |
| 157 # That maximum depth of the tree of recursive calls to command.Apply. This is | |
| 158 # an arbitrary limit put in place to prevent developers from accidentally | |
| 159 # causing problems with infinite recursion, and it can be increased if needed. | |
| 160 MAX_RECURSIVE_DEPTH = 5 | |
| 161 | |
| 162 ZERO_TASKS_TO_DO_ARGUMENT = ('There were no', 'tasks to do') | |
| 163 | |
| 164 # Map from deprecated aliases to the current command and subcommands that | |
| 165 # provide the same behavior. | |
| 166 # TODO: Remove this map and deprecate old commands on 9/9/14. | |
| 167 OLD_ALIAS_MAP = {'chacl': ['acl', 'ch'], | |
| 168 'getacl': ['acl', 'get'], | |
| 169 'setacl': ['acl', 'set'], | |
| 170 'getcors': ['cors', 'get'], | |
| 171 'setcors': ['cors', 'set'], | |
| 172 'chdefacl': ['defacl', 'ch'], | |
| 173 'getdefacl': ['defacl', 'get'], | |
| 174 'setdefacl': ['defacl', 'set'], | |
| 175 'disablelogging': ['logging', 'set', 'off'], | |
| 176 'enablelogging': ['logging', 'set', 'on'], | |
| 177 'getlogging': ['logging', 'get'], | |
| 178 'getversioning': ['versioning', 'get'], | |
| 179 'setversioning': ['versioning', 'set'], | |
| 180 'getwebcfg': ['web', 'get'], | |
| 181 'setwebcfg': ['web', 'set']} | |
| 182 | |
| 183 | |
| 184 # Declare all of the module level variables - see | |
| 185 # InitializeMultiprocessingVariables for an explanation of why this is | |
| 186 # necessary. | |
| 187 # pylint: disable=global-at-module-level | |
| 188 global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter | |
| 189 global total_tasks, call_completed_map, global_return_values_map | |
| 190 global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed | |
| 191 global current_max_recursive_level, shared_vars_map, shared_vars_list_map | |
| 192 global class_map, worker_checking_level_lock, failure_count | |
| 193 | |
| 194 | |
| 195 def InitializeMultiprocessingVariables(): | |
| 196 """Initializes module-level variables that will be inherited by subprocesses. | |
| 197 | |
| 198 On Windows, a multiprocessing.Manager object should only | |
| 199 be created within an "if __name__ == '__main__':" block. This function | |
| 200 must be called, otherwise every command that calls Command.Apply will fail. | |
| 201 """ | |
| 202 # This list of global variables must exactly match the above list of | |
| 203 # declarations. | |
| 204 # pylint: disable=global-variable-undefined | |
| 205 global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter | |
| 206 global total_tasks, call_completed_map, global_return_values_map | |
| 207 global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed | |
| 208 global current_max_recursive_level, shared_vars_map, shared_vars_list_map | |
| 209 global class_map, worker_checking_level_lock, failure_count | |
| 210 | |
| 211 manager = multiprocessing.Manager() | |
| 212 | |
| 213 consumer_pools = [] | |
| 214 | |
| 215 # List of all existing task queues - used by all pools to find the queue | |
| 216 # that's appropriate for the given recursive_apply_level. | |
| 217 task_queues = [] | |
| 218 | |
| 219 # Used to assign a globally unique caller ID to each Apply call. | |
| 220 caller_id_lock = manager.Lock() | |
| 221 caller_id_counter = multiprocessing.Value('i', 0) | |
| 222 | |
| 223 # Map from caller_id to total number of tasks to be completed for that ID. | |
| 224 total_tasks = ThreadAndProcessSafeDict(manager) | |
| 225 | |
| 226 # Map from caller_id to a boolean which is True iff all its tasks are | |
| 227 # finished. | |
| 228 call_completed_map = ThreadAndProcessSafeDict(manager) | |
| 229 | |
| 230 # Used to keep track of the set of return values for each caller ID. | |
| 231 global_return_values_map = AtomicIncrementDict(manager) | |
| 232 | |
| 233 # Condition used to notify any waiting threads that a task has finished or | |
| 234 # that a call to Apply needs a new set of consumer processes. | |
| 235 need_pool_or_done_cond = manager.Condition() | |
| 236 | |
| 237 # Lock used to prevent multiple worker processes from asking the main thread | |
| 238 # to create a new consumer pool for the same level. | |
| 239 worker_checking_level_lock = manager.Lock() | |
| 240 | |
| 241 # Map from caller_id to the current number of completed tasks for that ID. | |
| 242 caller_id_finished_count = AtomicIncrementDict(manager) | |
| 243 | |
| 244 # Used as a way for the main thread to distinguish between being woken up | |
| 245 # by another call finishing and being woken up by a call that needs a new set | |
| 246 # of consumer processes. | |
| 247 new_pool_needed = multiprocessing.Value('i', 0) | |
| 248 | |
| 249 current_max_recursive_level = multiprocessing.Value('i', 0) | |
| 250 | |
| 251 # Map from (caller_id, name) to the value of that shared variable. | |
| 252 shared_vars_map = AtomicIncrementDict(manager) | |
| 253 shared_vars_list_map = ThreadAndProcessSafeDict(manager) | |
| 254 | |
| 255 # Map from caller_id to calling class. | |
| 256 class_map = manager.dict() | |
| 257 | |
| 258 # Number of tasks that resulted in an exception in calls to Apply(). | |
| 259 failure_count = multiprocessing.Value('i', 0) | |
| 260 | |
| 261 | |
| 262 # Each subclass of Command must define a property named 'command_spec' that is | |
| 263 # an instance of the following class. | |
| 264 CommandSpec = namedtuple('CommandSpec', [ | |
| 265 # Name of command. | |
| 266 'command_name', | |
| 267 # Usage synopsis. | |
| 268 'usage_synopsis', | |
| 269 # List of command name aliases. | |
| 270 'command_name_aliases', | |
| 271 # Min number of args required by this command. | |
| 272 'min_args', | |
| 273 # Max number of args required by this command, or NO_MAX. | |
| 274 'max_args', | |
| 275 # Getopt-style string specifying acceptable sub args. | |
| 276 'supported_sub_args', | |
| 277 # True if file URLs are acceptable for this command. | |
| 278 'file_url_ok', | |
| 279 # True if provider-only URLs are acceptable for this command. | |
| 280 'provider_url_ok', | |
| 281 # Index in args of first URL arg. | |
| 282 'urls_start_arg', | |
| 283 # List of supported APIs | |
| 284 'gs_api_support', | |
| 285 # Default API to use for this command | |
| 286 'gs_default_api', | |
| 287 # Private arguments (for internal testing) | |
| 288 'supported_private_args', | |
| 289 'argparse_arguments', | |
| 290 ]) | |
| 291 | |
| 292 | |
| 293 class Command(HelpProvider): | |
| 294 """Base class for all gsutil commands.""" | |
| 295 | |
| 296 # Each subclass must override this with an instance of CommandSpec. | |
| 297 command_spec = None | |
| 298 | |
| 299 _commands_with_subcommands_and_subopts = ['acl', 'defacl', 'logging', 'web', | |
| 300 'notification'] | |
| 301 | |
| 302 # This keeps track of the recursive depth of the current call to Apply. | |
| 303 recursive_apply_level = 0 | |
| 304 | |
| 305 # If the multiprocessing module isn't available, we'll use this to keep track | |
| 306 # of the caller_id. | |
| 307 sequential_caller_id = -1 | |
| 308 | |
| 309 @staticmethod | |
| 310 def CreateCommandSpec(command_name, usage_synopsis=None, | |
| 311 command_name_aliases=None, min_args=0, | |
| 312 max_args=NO_MAX, supported_sub_args='', | |
| 313 file_url_ok=False, provider_url_ok=False, | |
| 314 urls_start_arg=0, gs_api_support=None, | |
| 315 gs_default_api=None, supported_private_args=None, | |
| 316 argparse_arguments=None): | |
| 317 """Creates an instance of CommandSpec, with defaults.""" | |
| 318 return CommandSpec( | |
| 319 command_name=command_name, | |
| 320 usage_synopsis=usage_synopsis, | |
| 321 command_name_aliases=command_name_aliases or [], | |
| 322 min_args=min_args, | |
| 323 max_args=max_args, | |
| 324 supported_sub_args=supported_sub_args, | |
| 325 file_url_ok=file_url_ok, | |
| 326 provider_url_ok=provider_url_ok, | |
| 327 urls_start_arg=urls_start_arg, | |
| 328 gs_api_support=gs_api_support or [ApiSelector.XML], | |
| 329 gs_default_api=gs_default_api or ApiSelector.XML, | |
| 330 supported_private_args=supported_private_args, | |
| 331 argparse_arguments=argparse_arguments or []) | |
| 332 | |
| 333 # Define a convenience property for command name, since it's used many places. | |
| 334 def _GetDefaultCommandName(self): | |
| 335 return self.command_spec.command_name | |
| 336 command_name = property(_GetDefaultCommandName) | |
| 337 | |
| 338 def _CalculateUrlsStartArg(self): | |
| 339 """Calculate the index in args of the first URL arg. | |
| 340 | |
| 341 Returns: | |
| 342 Index of the first URL arg (according to the command spec). | |
| 343 """ | |
| 344 return self.command_spec.urls_start_arg | |
| 345 | |
| 346 def _TranslateDeprecatedAliases(self, args): | |
| 347 """Map deprecated aliases to the corresponding new command, and warn.""" | |
| 348 new_command_args = OLD_ALIAS_MAP.get(self.command_alias_used, None) | |
| 349 if new_command_args: | |
| 350 # Prepend any subcommands for the new command. The command name itself | |
| 351 # is not part of the args, so leave it out. | |
| 352 args = new_command_args[1:] + args | |
| 353 self.logger.warn('\n'.join(textwrap.wrap( | |
| 354 ('You are using a deprecated alias, "%(used_alias)s", for the ' | |
| 355 '"%(command_name)s" command. This will stop working on 9/9/2014. ' | |
| 356 'Please use "%(command_name)s" with the appropriate sub-command in ' | |
| 357 'the future. See "gsutil help %(command_name)s" for details.') % | |
| 358 {'used_alias': self.command_alias_used, | |
| 359 'command_name': self.command_name}))) | |
| 360 return args | |
| 361 | |
| 362 def __init__(self, command_runner, args, headers, debug, parallel_operations, | |
| 363 bucket_storage_uri_class, gsutil_api_class_map_factory, | |
| 364 test_method=None, logging_filters=None, | |
| 365 command_alias_used=None): | |
| 366 """Instantiates a Command. | |
| 367 | |
| 368 Args: | |
| 369 command_runner: CommandRunner (for commands built atop other commands). | |
| 370 args: Command-line args (arg0 = actual arg, not command name ala bash). | |
| 371 headers: Dictionary containing optional HTTP headers to pass to boto. | |
| 372 debug: Debug level to pass in to boto connection (range 0..3). | |
| 373 parallel_operations: Should command operations be executed in parallel? | |
| 374 bucket_storage_uri_class: Class to instantiate for cloud StorageUris. | |
| 375 Settable for testing/mocking. | |
| 376 gsutil_api_class_map_factory: Creates map of cloud storage interfaces. | |
| 377 Settable for testing/mocking. | |
| 378 test_method: Optional general purpose method for testing purposes. | |
| 379 Application and semantics of this method will vary by | |
| 380 command and test type. | |
| 381 logging_filters: Optional list of logging.Filters to apply to this | |
| 382 command's logger. | |
| 383 command_alias_used: The alias that was actually used when running this | |
| 384 command (as opposed to the "official" command name, | |
| 385 which will always correspond to the file name). | |
| 386 | |
| 387 Implementation note: subclasses shouldn't need to define an __init__ | |
| 388 method, and instead depend on the shared initialization that happens | |
| 389 here. If you do define an __init__ method in a subclass you'll need to | |
| 390 explicitly call super().__init__(). But you're encouraged not to do this, | |
| 391 because it will make changing the __init__ interface more painful. | |
| 392 """ | |
| 393 # Save class values from constructor params. | |
| 394 self.command_runner = command_runner | |
| 395 self.unparsed_args = args | |
| 396 self.headers = headers | |
| 397 self.debug = debug | |
| 398 self.parallel_operations = parallel_operations | |
| 399 self.bucket_storage_uri_class = bucket_storage_uri_class | |
| 400 self.gsutil_api_class_map_factory = gsutil_api_class_map_factory | |
| 401 self.test_method = test_method | |
| 402 self.exclude_symlinks = False | |
| 403 self.recursion_requested = False | |
| 404 self.all_versions = False | |
| 405 self.command_alias_used = command_alias_used | |
| 406 | |
| 407 # Global instance of a threaded logger object. | |
| 408 self.logger = CreateGsutilLogger(self.command_name) | |
| 409 if logging_filters: | |
| 410 for log_filter in logging_filters: | |
| 411 self.logger.addFilter(log_filter) | |
| 412 | |
| 413 if self.command_spec is None: | |
| 414 raise CommandException('"%s" command implementation is missing a ' | |
| 415 'command_spec definition.' % self.command_name) | |
| 416 | |
| 417 # Parse and validate args. | |
| 418 self.args = self._TranslateDeprecatedAliases(args) | |
| 419 self.ParseSubOpts() | |
| 420 | |
| 421 # Named tuple public functions start with _ | |
| 422 # pylint: disable=protected-access | |
| 423 self.command_spec = self.command_spec._replace( | |
| 424 urls_start_arg=self._CalculateUrlsStartArg()) | |
| 425 | |
| 426 if (len(self.args) < self.command_spec.min_args | |
| 427 or len(self.args) > self.command_spec.max_args): | |
| 428 self.RaiseWrongNumberOfArgumentsException() | |
| 429 | |
| 430 if self.command_name not in self._commands_with_subcommands_and_subopts: | |
| 431 self.CheckArguments() | |
| 432 | |
| 433 # Build the support and default maps from the command spec. | |
| 434 support_map = { | |
| 435 'gs': self.command_spec.gs_api_support, | |
| 436 's3': [ApiSelector.XML] | |
| 437 } | |
| 438 default_map = { | |
| 439 'gs': self.command_spec.gs_default_api, | |
| 440 's3': ApiSelector.XML | |
| 441 } | |
| 442 self.gsutil_api_map = GsutilApiMapFactory.GetApiMap( | |
| 443 self.gsutil_api_class_map_factory, support_map, default_map) | |
| 444 | |
| 445 self.project_id = None | |
| 446 self.gsutil_api = CloudApiDelegator( | |
| 447 bucket_storage_uri_class, self.gsutil_api_map, | |
| 448 self.logger, debug=self.debug) | |
| 449 | |
| 450 # Cross-platform path to run gsutil binary. | |
| 451 self.gsutil_cmd = '' | |
| 452 # If running on Windows, invoke python interpreter explicitly. | |
| 453 if gslib.util.IS_WINDOWS: | |
| 454 self.gsutil_cmd += 'python ' | |
| 455 # Add full path to gsutil to make sure we test the correct version. | |
| 456 self.gsutil_path = gslib.GSUTIL_PATH | |
| 457 self.gsutil_cmd += self.gsutil_path | |
| 458 | |
| 459 # We're treating recursion_requested like it's used by all commands, but | |
| 460 # only some of the commands accept the -R option. | |
| 461 if self.sub_opts: | |
| 462 for o, unused_a in self.sub_opts: | |
| 463 if o == '-r' or o == '-R': | |
| 464 self.recursion_requested = True | |
| 465 break | |
| 466 | |
| 467 self.multiprocessing_is_available = MultiprocessingIsAvailable()[0] | |
| 468 | |
| 469 def RaiseWrongNumberOfArgumentsException(self): | |
| 470 """Raises exception for wrong number of arguments supplied to command.""" | |
| 471 if len(self.args) < self.command_spec.min_args: | |
| 472 tail_str = 's' if self.command_spec.min_args > 1 else '' | |
| 473 message = ('The %s command requires at least %d argument%s.' % | |
| 474 (self.command_name, self.command_spec.min_args, tail_str)) | |
| 475 else: | |
| 476 message = ('The %s command accepts at most %d arguments.' % | |
| 477 (self.command_name, self.command_spec.max_args)) | |
| 478 message += ' Usage:\n%s\nFor additional help run:\n gsutil help %s' % ( | |
| 479 self.command_spec.usage_synopsis, self.command_name) | |
| 480 raise CommandException(message) | |
| 481 | |
| 482 def RaiseInvalidArgumentException(self): | |
| 483 """Raises exception for specifying an invalid argument to command.""" | |
| 484 message = ('Incorrect option(s) specified. Usage:\n%s\n' | |
| 485 'For additional help run:\n gsutil help %s' % ( | |
| 486 self.command_spec.usage_synopsis, self.command_name)) | |
| 487 raise CommandException(message) | |
| 488 | |
| 489 def ParseSubOpts(self, check_args=False): | |
| 490 """Parses sub-opt args. | |
| 491 | |
| 492 Args: | |
| 493 check_args: True to have CheckArguments() called after parsing. | |
| 494 | |
| 495 Populates: | |
| 496 (self.sub_opts, self.args) from parsing. | |
| 497 | |
| 498 Raises: RaiseInvalidArgumentException if invalid args specified. | |
| 499 """ | |
| 500 try: | |
| 501 self.sub_opts, self.args = getopt.getopt( | |
| 502 self.args, self.command_spec.supported_sub_args, | |
| 503 self.command_spec.supported_private_args or []) | |
| 504 except getopt.GetoptError: | |
| 505 self.RaiseInvalidArgumentException() | |
| 506 if check_args: | |
| 507 self.CheckArguments() | |
| 508 | |
| 509 def CheckArguments(self): | |
| 510 """Checks that command line arguments match the command_spec. | |
| 511 | |
| 512 Any commands in self._commands_with_subcommands_and_subopts are responsible | |
| 513 for calling this method after handling initial parsing of their arguments. | |
| 514 This prevents commands with sub-commands as well as options from breaking | |
| 515 the parsing of getopt. | |
| 516 | |
| 517 TODO: Provide a function to parse commands and sub-commands more | |
| 518 intelligently once we stop allowing the deprecated command versions. | |
| 519 | |
| 520 Raises: | |
| 521 CommandException if the arguments don't match. | |
| 522 """ | |
| 523 | |
| 524 if (not self.command_spec.file_url_ok | |
| 525 and HaveFileUrls(self.args[self.command_spec.urls_start_arg:])): | |
| 526 raise CommandException('"%s" command does not support "file://" URLs. ' | |
| 527 'Did you mean to use a gs:// URL?' % | |
| 528 self.command_name) | |
| 529 if (not self.command_spec.provider_url_ok | |
| 530 and HaveProviderUrls(self.args[self.command_spec.urls_start_arg:])): | |
| 531 raise CommandException('"%s" command does not support provider-only ' | |
| 532 'URLs.' % self.command_name) | |
| 533 | |
| 534 def WildcardIterator(self, url_string, all_versions=False): | |
| 535 """Helper to instantiate gslib.WildcardIterator. | |
| 536 | |
| 537 Args are same as gslib.WildcardIterator interface, but this method fills in | |
| 538 most of the values from instance state. | |
| 539 | |
| 540 Args: | |
| 541 url_string: URL string naming wildcard objects to iterate. | |
| 542 all_versions: If true, the iterator yields all versions of objects | |
| 543 matching the wildcard. If false, yields just the live | |
| 544 object version. | |
| 545 | |
| 546 Returns: | |
| 547 WildcardIterator for use by caller. | |
| 548 """ | |
| 549 return CreateWildcardIterator( | |
| 550 url_string, self.gsutil_api, all_versions=all_versions, | |
| 551 debug=self.debug, project_id=self.project_id) | |
| 552 | |
| 553 def RunCommand(self): | |
| 554 """Abstract function in base class. Subclasses must implement this. | |
| 555 | |
| 556 The return value of this function will be used as the exit status of the | |
| 557 process, so subclass commands should return an integer exit code (0 for | |
| 558 success, a value in [1,255] for failure). | |
| 559 """ | |
| 560 raise CommandException('Command %s is missing its RunCommand() ' | |
| 561 'implementation' % self.command_name) | |
| 562 | |
| 563 ############################################################ | |
| 564 # Shared helper functions that depend on base class state. # | |
| 565 ############################################################ | |
| 566 | |
| 567 def ApplyAclFunc(self, acl_func, acl_excep_handler, url_strs): | |
| 568 """Sets the standard or default object ACL depending on self.command_name. | |
| 569 | |
| 570 Args: | |
| 571 acl_func: ACL function to be passed to Apply. | |
| 572 acl_excep_handler: ACL exception handler to be passed to Apply. | |
| 573 url_strs: URL strings on which to set ACL. | |
| 574 | |
| 575 Raises: | |
| 576 CommandException if an ACL could not be set. | |
| 577 """ | |
| 578 multi_threaded_url_args = [] | |
| 579 # Handle bucket ACL setting operations single-threaded, because | |
| 580 # our threading machinery currently assumes it's working with objects | |
| 581 # (name_expansion_iterator), and normally we wouldn't expect users to need | |
| 582 # to set ACLs on huge numbers of buckets at once anyway. | |
| 583 for url_str in url_strs: | |
| 584 url = StorageUrlFromString(url_str) | |
| 585 if url.IsCloudUrl() and url.IsBucket(): | |
| 586 if self.recursion_requested: | |
| 587 # If user specified -R option, convert any bucket args to bucket | |
| 588 # wildcards (e.g., gs://bucket/*), to prevent the operation from | |
| 589 # being applied to the buckets themselves. | |
| 590 url.object_name = '*' | |
| 591 multi_threaded_url_args.append(url.url_string) | |
| 592 else: | |
| 593 # Convert to a NameExpansionResult so we can re-use the threaded | |
| 594 # function for the single-threaded implementation. RefType is unused. | |
| 595 for blr in self.WildcardIterator(url.url_string).IterBuckets( | |
| 596 bucket_fields=['id']): | |
| 597 name_expansion_for_url = NameExpansionResult( | |
| 598 url, False, False, blr.storage_url) | |
| 599 acl_func(self, name_expansion_for_url) | |
| 600 else: | |
| 601 multi_threaded_url_args.append(url_str) | |
| 602 | |
| 603 if len(multi_threaded_url_args) >= 1: | |
| 604 name_expansion_iterator = NameExpansionIterator( | |
| 605 self.command_name, self.debug, | |
| 606 self.logger, self.gsutil_api, | |
| 607 multi_threaded_url_args, self.recursion_requested, | |
| 608 all_versions=self.all_versions, | |
| 609 continue_on_error=self.continue_on_error or self.parallel_operations) | |
| 610 | |
| 611 # Perform requests in parallel (-m) mode, if requested, using | |
| 612 # configured number of parallel processes and threads. Otherwise, | |
| 613 # perform requests with sequential function calls in current process. | |
| 614 self.Apply(acl_func, name_expansion_iterator, acl_excep_handler, | |
| 615 fail_on_error=not self.continue_on_error) | |
| 616 | |
| 617 if not self.everything_set_okay and not self.continue_on_error: | |
| 618 raise CommandException('ACLs for some objects could not be set.') | |
| 619 | |
| 620 def SetAclFunc(self, name_expansion_result, thread_state=None): | |
| 621 """Sets the object ACL for the name_expansion_result provided. | |
| 622 | |
| 623 Args: | |
| 624 name_expansion_result: NameExpansionResult describing the target object. | |
| 625 thread_state: If present, use this gsutil Cloud API instance for the set. | |
| 626 """ | |
| 627 if thread_state: | |
| 628 assert not self.def_acl | |
| 629 gsutil_api = thread_state | |
| 630 else: | |
| 631 gsutil_api = self.gsutil_api | |
| 632 op_string = 'default object ACL' if self.def_acl else 'ACL' | |
| 633 url = name_expansion_result.expanded_storage_url | |
| 634 self.logger.info('Setting %s on %s...', op_string, url) | |
| 635 if (gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML | |
| 636 and url.scheme != 'gs'): | |
| 637 # If we are called with a non-google ACL model, we need to use the XML | |
| 638 # passthrough. acl_arg should either be a canned ACL or an XML ACL. | |
| 639 self._SetAclXmlPassthrough(url, gsutil_api) | |
| 640 else: | |
| 641 # Normal Cloud API path. acl_arg is a JSON ACL or a canned ACL. | |
| 642 self._SetAclGsutilApi(url, gsutil_api) | |
| 643 | |
| 644 def _SetAclXmlPassthrough(self, url, gsutil_api): | |
| 645 """Sets the ACL for the URL provided using the XML passthrough functions. | |
| 646 | |
| 647 This function assumes that self.def_acl, self.canned, | |
| 648 and self.continue_on_error are initialized, and that self.acl_arg is | |
| 649 either an XML string or a canned ACL string. | |
| 650 | |
| 651 Args: | |
| 652 url: CloudURL to set the ACL on. | |
| 653 gsutil_api: gsutil Cloud API to use for the ACL set. Must support XML | |
| 654 passthrough functions. | |
| 655 """ | |
| 656 try: | |
| 657 orig_prefer_api = gsutil_api.prefer_api | |
| 658 gsutil_api.prefer_api = ApiSelector.XML | |
| 659 gsutil_api.XmlPassThroughSetAcl( | |
| 660 self.acl_arg, url, canned=self.canned, | |
| 661 def_obj_acl=self.def_acl, provider=url.scheme) | |
| 662 except ServiceException as e: | |
| 663 if self.continue_on_error: | |
| 664 self.everything_set_okay = False | |
| 665 self.logger.error(e) | |
| 666 else: | |
| 667 raise | |
| 668 finally: | |
| 669 gsutil_api.prefer_api = orig_prefer_api | |
| 670 | |
| 671 def _SetAclGsutilApi(self, url, gsutil_api): | |
| 672 """Sets the ACL for the URL provided using the gsutil Cloud API. | |
| 673 | |
| 674 This function assumes that self.def_acl, self.canned, | |
| 675 and self.continue_on_error are initialized, and that self.acl_arg is | |
| 676 either a JSON string or a canned ACL string. | |
| 677 | |
| 678 Args: | |
| 679 url: CloudURL to set the ACL on. | |
| 680 gsutil_api: gsutil Cloud API to use for the ACL set. | |
| 681 """ | |
| 682 try: | |
| 683 if url.IsBucket(): | |
| 684 if self.def_acl: | |
| 685 if self.canned: | |
| 686 gsutil_api.PatchBucket( | |
| 687 url.bucket_name, apitools_messages.Bucket(), | |
| 688 canned_def_acl=self.acl_arg, provider=url.scheme, fields=['id']) | |
| 689 else: | |
| 690 def_obj_acl = AclTranslation.JsonToMessage( | |
| 691 self.acl_arg, apitools_messages.ObjectAccessControl) | |
| 692 bucket_metadata = apitools_messages.Bucket( | |
| 693 defaultObjectAcl=def_obj_acl) | |
| 694 gsutil_api.PatchBucket(url.bucket_name, bucket_metadata, | |
| 695 provider=url.scheme, fields=['id']) | |
| 696 else: | |
| 697 if self.canned: | |
| 698 gsutil_api.PatchBucket( | |
| 699 url.bucket_name, apitools_messages.Bucket(), | |
| 700 canned_acl=self.acl_arg, provider=url.scheme, fields=['id']) | |
| 701 else: | |
| 702 bucket_acl = AclTranslation.JsonToMessage( | |
| 703 self.acl_arg, apitools_messages.BucketAccessControl) | |
| 704 bucket_metadata = apitools_messages.Bucket(acl=bucket_acl) | |
| 705 gsutil_api.PatchBucket(url.bucket_name, bucket_metadata, | |
| 706 provider=url.scheme, fields=['id']) | |
| 707 else: # url.IsObject() | |
| 708 if self.canned: | |
| 709 gsutil_api.PatchObjectMetadata( | |
| 710 url.bucket_name, url.object_name, apitools_messages.Object(), | |
| 711 provider=url.scheme, generation=url.generation, | |
| 712 canned_acl=self.acl_arg) | |
| 713 else: | |
| 714 object_acl = AclTranslation.JsonToMessage( | |
| 715 self.acl_arg, apitools_messages.ObjectAccessControl) | |
| 716 object_metadata = apitools_messages.Object(acl=object_acl) | |
| 717 gsutil_api.PatchObjectMetadata(url.bucket_name, url.object_name, | |
| 718 object_metadata, provider=url.scheme, | |
| 719 generation=url.generation) | |
| 720 except ArgumentException, e: | |
| 721 raise | |
| 722 except ServiceException, e: | |
| 723 if self.continue_on_error: | |
| 724 self.everything_set_okay = False | |
| 725 self.logger.error(e) | |
| 726 else: | |
| 727 raise | |
| 728 | |
| 729 def SetAclCommandHelper(self, acl_func, acl_excep_handler): | |
| 730 """Sets ACLs on the self.args using the passed-in acl function. | |
| 731 | |
| 732 Args: | |
| 733 acl_func: ACL function to be passed to Apply. | |
| 734 acl_excep_handler: ACL exception handler to be passed to Apply. | |
| 735 """ | |
| 736 acl_arg = self.args[0] | |
| 737 url_args = self.args[1:] | |
| 738 # Disallow multi-provider setacl requests, because there are differences in | |
| 739 # the ACL models. | |
| 740 if not UrlsAreForSingleProvider(url_args): | |
| 741 raise CommandException('"%s" command spanning providers not allowed.' % | |
| 742 self.command_name) | |
| 743 | |
| 744 # Determine whether acl_arg names a file containing XML ACL text vs. the | |
| 745 # string name of a canned ACL. | |
| 746 if os.path.isfile(acl_arg): | |
| 747 with codecs.open(acl_arg, 'r', UTF8) as f: | |
| 748 acl_arg = f.read() | |
| 749 self.canned = False | |
| 750 else: | |
| 751 # No file exists, so expect a canned ACL string. | |
| 752 # Canned ACLs are not supported in JSON and we need to use the XML API | |
| 753 # to set them. | |
| 754 # validate=False because we allow wildcard urls. | |
| 755 storage_uri = boto.storage_uri( | |
| 756 url_args[0], debug=self.debug, validate=False, | |
| 757 bucket_storage_uri_class=self.bucket_storage_uri_class) | |
| 758 | |
| 759 canned_acls = storage_uri.canned_acls() | |
| 760 if acl_arg not in canned_acls: | |
| 761 raise CommandException('Invalid canned ACL "%s".' % acl_arg) | |
| 762 self.canned = True | |
| 763 | |
| 764 # Used to track if any ACLs failed to be set. | |
| 765 self.everything_set_okay = True | |
| 766 self.acl_arg = acl_arg | |
| 767 | |
| 768 self.ApplyAclFunc(acl_func, acl_excep_handler, url_args) | |
| 769 if not self.everything_set_okay and not self.continue_on_error: | |
| 770 raise CommandException('ACLs for some objects could not be set.') | |
| 771 | |
| 772 def _WarnServiceAccounts(self): | |
| 773 """Warns service account users who have received an AccessDenied error. | |
| 774 | |
| 775 When one of the metadata-related commands fails due to AccessDenied, user | |
| 776 must ensure that they are listed as an Owner in the API console. | |
| 777 """ | |
| 778 # Import this here so that the value will be set first in | |
| 779 # gcs_oauth2_boto_plugin. | |
| 780 # pylint: disable=g-import-not-at-top | |
| 781 from gcs_oauth2_boto_plugin.oauth2_plugin import IS_SERVICE_ACCOUNT | |
| 782 | |
| 783 if IS_SERVICE_ACCOUNT: | |
| 784 # This method is only called when canned ACLs are used, so the warning | |
| 785 # definitely applies. | |
| 786 self.logger.warning('\n'.join(textwrap.wrap( | |
| 787 'It appears that your service account has been denied access while ' | |
| 788 'attempting to perform a metadata operation. If you believe that you ' | |
| 789 'should have access to this metadata (i.e., if it is associated with ' | |
| 790 'your account), please make sure that your service account''s email ' | |
| 791 'address is listed as an Owner in the Team tab of the API console. ' | |
| 792 'See "gsutil help creds" for further information.\n'))) | |
| 793 | |
| 794 def GetAndPrintAcl(self, url_str): | |
| 795 """Prints the standard or default object ACL depending on self.command_name. | |
| 796 | |
| 797 Args: | |
| 798 url_str: URL string to get ACL for. | |
| 799 """ | |
| 800 blr = self.GetAclCommandBucketListingReference(url_str) | |
| 801 url = StorageUrlFromString(url_str) | |
| 802 if (self.gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML | |
| 803 and url.scheme != 'gs'): | |
| 804 # Need to use XML passthrough. | |
| 805 try: | |
| 806 acl = self.gsutil_api.XmlPassThroughGetAcl( | |
| 807 url, def_obj_acl=self.def_acl, provider=url.scheme) | |
| 808 print acl.to_xml() | |
| 809 except AccessDeniedException, _: | |
| 810 self._WarnServiceAccounts() | |
| 811 raise | |
| 812 else: | |
| 813 if self.command_name == 'defacl': | |
| 814 acl = blr.root_object.defaultObjectAcl | |
| 815 if not acl: | |
| 816 self.logger.warn( | |
| 817 'No default object ACL present for %s. This could occur if ' | |
| 818 'the default object ACL is private, in which case objects ' | |
| 819 'created in this bucket will be readable only by their ' | |
| 820 'creators. It could also mean you do not have OWNER permission ' | |
| 821 'on %s and therefore do not have permission to read the ' | |
| 822 'default object ACL.', url_str, url_str) | |
| 823 else: | |
| 824 acl = blr.root_object.acl | |
| 825 if not acl: | |
| 826 self._WarnServiceAccounts() | |
| 827 raise AccessDeniedException('Access denied. Please ensure you have ' | |
| 828 'OWNER permission on %s.' % url_str) | |
| 829 print AclTranslation.JsonFromMessage(acl) | |
| 830 | |
| 831 def GetAclCommandBucketListingReference(self, url_str): | |
| 832 """Gets a single bucket listing reference for an acl get command. | |
| 833 | |
| 834 Args: | |
| 835 url_str: URL string to get the bucket listing reference for. | |
| 836 | |
| 837 Returns: | |
| 838 BucketListingReference for the URL string. | |
| 839 | |
| 840 Raises: | |
| 841 CommandException if string did not result in exactly one reference. | |
| 842 """ | |
| 843 # We're guaranteed by caller that we have the appropriate type of url | |
| 844 # string for the call (ex. we will never be called with an object string | |
| 845 # by getdefacl) | |
| 846 wildcard_url = StorageUrlFromString(url_str) | |
| 847 if wildcard_url.IsObject(): | |
| 848 plurality_iter = PluralityCheckableIterator( | |
| 849 self.WildcardIterator(url_str).IterObjects( | |
| 850 bucket_listing_fields=['acl'])) | |
| 851 else: | |
| 852 # Bucket or provider. We call IterBuckets explicitly here to ensure that | |
| 853 # the root object is populated with the acl. | |
| 854 if self.command_name == 'defacl': | |
| 855 bucket_fields = ['defaultObjectAcl'] | |
| 856 else: | |
| 857 bucket_fields = ['acl'] | |
| 858 plurality_iter = PluralityCheckableIterator( | |
| 859 self.WildcardIterator(url_str).IterBuckets( | |
| 860 bucket_fields=bucket_fields)) | |
| 861 if plurality_iter.IsEmpty(): | |
| 862 raise CommandException('No URLs matched') | |
| 863 if plurality_iter.HasPlurality(): | |
| 864 raise CommandException( | |
| 865 '%s matched more than one URL, which is not allowed by the %s ' | |
| 866 'command' % (url_str, self.command_name)) | |
| 867 return list(plurality_iter)[0] | |
| 868 | |
| 869 def _HandleMultiProcessingSigs(self, unused_signal_num, | |
| 870 unused_cur_stack_frame): | |
| 871 """Handles signals INT AND TERM during a multi-process/multi-thread request. | |
| 872 | |
| 873 Kills subprocesses. | |
| 874 | |
| 875 Args: | |
| 876 unused_signal_num: signal generated by ^C. | |
| 877 unused_cur_stack_frame: Current stack frame. | |
| 878 """ | |
| 879 # Note: This only works under Linux/MacOS. See | |
| 880 # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details | |
| 881 # about why making it work correctly across OS's is harder and still open. | |
| 882 ShutDownGsutil() | |
| 883 sys.stderr.write('Caught ^C - exiting\n') | |
| 884 # Simply calling sys.exit(1) doesn't work - see above bug for details. | |
| 885 KillProcess(os.getpid()) | |
| 886 | |
| 887 def GetSingleBucketUrlFromArg(self, arg, bucket_fields=None): | |
| 888 """Gets a single bucket URL based on the command arguments. | |
| 889 | |
| 890 Args: | |
| 891 arg: String argument to get bucket URL for. | |
| 892 bucket_fields: Fields to populate for the bucket. | |
| 893 | |
| 894 Returns: | |
| 895 (StorageUrl referring to a single bucket, Bucket metadata). | |
| 896 | |
| 897 Raises: | |
| 898 CommandException if args did not match exactly one bucket. | |
| 899 """ | |
| 900 plurality_checkable_iterator = self.GetBucketUrlIterFromArg( | |
| 901 arg, bucket_fields=bucket_fields) | |
| 902 if plurality_checkable_iterator.HasPlurality(): | |
| 903 raise CommandException( | |
| 904 '%s matched more than one URL, which is not\n' | |
| 905 'allowed by the %s command' % (arg, self.command_name)) | |
| 906 blr = list(plurality_checkable_iterator)[0] | |
| 907 return StorageUrlFromString(blr.url_string), blr.root_object | |
| 908 | |
| 909 def GetBucketUrlIterFromArg(self, arg, bucket_fields=None): | |
| 910 """Gets a single bucket URL based on the command arguments. | |
| 911 | |
| 912 Args: | |
| 913 arg: String argument to iterate over. | |
| 914 bucket_fields: Fields to populate for the bucket. | |
| 915 | |
| 916 Returns: | |
| 917 PluralityCheckableIterator over buckets. | |
| 918 | |
| 919 Raises: | |
| 920 CommandException if iterator matched no buckets. | |
| 921 """ | |
| 922 arg_url = StorageUrlFromString(arg) | |
| 923 if not arg_url.IsCloudUrl() or arg_url.IsObject(): | |
| 924 raise CommandException('"%s" command must specify a bucket' % | |
| 925 self.command_name) | |
| 926 | |
| 927 plurality_checkable_iterator = PluralityCheckableIterator( | |
| 928 self.WildcardIterator(arg).IterBuckets( | |
| 929 bucket_fields=bucket_fields)) | |
| 930 if plurality_checkable_iterator.IsEmpty(): | |
| 931 raise CommandException('No URLs matched') | |
| 932 return plurality_checkable_iterator | |
| 933 | |
| 934 ###################### | |
| 935 # Private functions. # | |
| 936 ###################### | |
| 937 | |
| 938 def _ResetConnectionPool(self): | |
| 939 # Each OS process needs to establish its own set of connections to | |
| 940 # the server to avoid writes from different OS processes interleaving | |
| 941 # onto the same socket (and garbling the underlying SSL session). | |
| 942 # We ensure each process gets its own set of connections here by | |
| 943 # closing all connections in the storage provider connection pool. | |
| 944 connection_pool = StorageUri.provider_pool | |
| 945 if connection_pool: | |
| 946 for i in connection_pool: | |
| 947 connection_pool[i].connection.close() | |
| 948 | |
| 949 def _GetProcessAndThreadCount(self, process_count, thread_count, | |
| 950 parallel_operations_override): | |
| 951 """Determines the values of process_count and thread_count. | |
| 952 | |
| 953 These values are used for parallel operations. | |
| 954 If we're not performing operations in parallel, then ignore | |
| 955 existing values and use process_count = thread_count = 1. | |
| 956 | |
| 957 Args: | |
| 958 process_count: A positive integer or None. In the latter case, we read | |
| 959 the value from the .boto config file. | |
| 960 thread_count: A positive integer or None. In the latter case, we read | |
| 961 the value from the .boto config file. | |
| 962 parallel_operations_override: Used to override self.parallel_operations. | |
| 963 This allows the caller to safely override | |
| 964 the top-level flag for a single call. | |
| 965 | |
| 966 Returns: | |
| 967 (process_count, thread_count): The number of processes and threads to use, | |
| 968 respectively. | |
| 969 """ | |
| 970 # Set OS process and python thread count as a function of options | |
| 971 # and config. | |
| 972 if self.parallel_operations or parallel_operations_override: | |
| 973 if not process_count: | |
| 974 process_count = boto.config.getint( | |
| 975 'GSUtil', 'parallel_process_count', | |
| 976 gslib.commands.config.DEFAULT_PARALLEL_PROCESS_COUNT) | |
| 977 if process_count < 1: | |
| 978 raise CommandException('Invalid parallel_process_count "%d".' % | |
| 979 process_count) | |
| 980 if not thread_count: | |
| 981 thread_count = boto.config.getint( | |
| 982 'GSUtil', 'parallel_thread_count', | |
| 983 gslib.commands.config.DEFAULT_PARALLEL_THREAD_COUNT) | |
| 984 if thread_count < 1: | |
| 985 raise CommandException('Invalid parallel_thread_count "%d".' % | |
| 986 thread_count) | |
| 987 else: | |
| 988 # If -m not specified, then assume 1 OS process and 1 Python thread. | |
| 989 process_count = 1 | |
| 990 thread_count = 1 | |
| 991 | |
| 992 if IS_WINDOWS and process_count > 1: | |
| 993 raise CommandException('\n'.join(textwrap.wrap( | |
| 994 ('It is not possible to set process_count > 1 on Windows. Please ' | |
| 995 'update your config file (located at %s) and set ' | |
| 996 '"parallel_process_count = 1".') % | |
| 997 GetConfigFilePath()))) | |
| 998 self.logger.debug('process count: %d', process_count) | |
| 999 self.logger.debug('thread count: %d', thread_count) | |
| 1000 | |
| 1001 return (process_count, thread_count) | |
| 1002 | |
| 1003 def _SetUpPerCallerState(self): | |
| 1004 """Set up the state for a caller id, corresponding to one Apply call.""" | |
| 1005 # Get a new caller ID. | |
| 1006 with caller_id_lock: | |
| 1007 caller_id_counter.value += 1 | |
| 1008 caller_id = caller_id_counter.value | |
| 1009 | |
| 1010 # Create a copy of self with an incremented recursive level. This allows | |
| 1011 # the class to report its level correctly if the function called from it | |
| 1012 # also needs to call Apply. | |
| 1013 cls = copy.copy(self) | |
| 1014 cls.recursive_apply_level += 1 | |
| 1015 | |
| 1016 # Thread-safe loggers can't be pickled, so we will remove it here and | |
| 1017 # recreate it later in the WorkerThread. This is not a problem since any | |
| 1018 # logger with the same name will be treated as a singleton. | |
| 1019 cls.logger = None | |
| 1020 | |
| 1021 # Likewise, the default API connection can't be pickled, but it is unused | |
| 1022 # anyway as each thread gets its own API delegator. | |
| 1023 cls.gsutil_api = None | |
| 1024 | |
| 1025 class_map[caller_id] = cls | |
| 1026 total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet. | |
| 1027 call_completed_map[caller_id] = False | |
| 1028 caller_id_finished_count.Put(caller_id, 0) | |
| 1029 global_return_values_map.Put(caller_id, []) | |
| 1030 return caller_id | |
| 1031 | |
| 1032 def _CreateNewConsumerPool(self, num_processes, num_threads): | |
| 1033 """Create a new pool of processes that call _ApplyThreads.""" | |
| 1034 processes = [] | |
| 1035 task_queue = _NewMultiprocessingQueue() | |
| 1036 task_queues.append(task_queue) | |
| 1037 | |
| 1038 current_max_recursive_level.value += 1 | |
| 1039 if current_max_recursive_level.value > MAX_RECURSIVE_DEPTH: | |
| 1040 raise CommandException('Recursion depth of Apply calls is too great.') | |
| 1041 for _ in range(num_processes): | |
| 1042 recursive_apply_level = len(consumer_pools) | |
| 1043 p = multiprocessing.Process( | |
| 1044 target=self._ApplyThreads, | |
| 1045 args=(num_threads, num_processes, recursive_apply_level)) | |
| 1046 p.daemon = True | |
| 1047 processes.append(p) | |
| 1048 p.start() | |
| 1049 consumer_pool = _ConsumerPool(processes, task_queue) | |
| 1050 consumer_pools.append(consumer_pool) | |
| 1051 | |
| 1052 def Apply(self, func, args_iterator, exception_handler, | |
| 1053 shared_attrs=None, arg_checker=_UrlArgChecker, | |
| 1054 parallel_operations_override=False, process_count=None, | |
| 1055 thread_count=None, should_return_results=False, | |
| 1056 fail_on_error=False): | |
| 1057 """Calls _Parallel/SequentialApply based on multiprocessing availability. | |
| 1058 | |
| 1059 Args: | |
| 1060 func: Function to call to process each argument. | |
| 1061 args_iterator: Iterable collection of arguments to be put into the | |
| 1062 work queue. | |
| 1063 exception_handler: Exception handler for WorkerThread class. | |
| 1064 shared_attrs: List of attributes to manage across sub-processes. | |
| 1065 arg_checker: Used to determine whether we should process the current | |
| 1066 argument or simply skip it. Also handles any logging that | |
| 1067 is specific to a particular type of argument. | |
| 1068 parallel_operations_override: Used to override self.parallel_operations. | |
| 1069 This allows the caller to safely override | |
| 1070 the top-level flag for a single call. | |
| 1071 process_count: The number of processes to use. If not specified, then | |
| 1072 the configured default will be used. | |
| 1073 thread_count: The number of threads per process. If not speficied, then | |
| 1074 the configured default will be used.. | |
| 1075 should_return_results: If true, then return the results of all successful | |
| 1076 calls to func in a list. | |
| 1077 fail_on_error: If true, then raise any exceptions encountered when | |
| 1078 executing func. This is only applicable in the case of | |
| 1079 process_count == thread_count == 1. | |
| 1080 | |
| 1081 Returns: | |
| 1082 Results from spawned threads. | |
| 1083 """ | |
| 1084 if shared_attrs: | |
| 1085 original_shared_vars_values = {} # We'll add these back in at the end. | |
| 1086 for name in shared_attrs: | |
| 1087 original_shared_vars_values[name] = getattr(self, name) | |
| 1088 # By setting this to 0, we simplify the logic for computing deltas. | |
| 1089 # We'll add it back after all of the tasks have been performed. | |
| 1090 setattr(self, name, 0) | |
| 1091 | |
| 1092 (process_count, thread_count) = self._GetProcessAndThreadCount( | |
| 1093 process_count, thread_count, parallel_operations_override) | |
| 1094 | |
| 1095 is_main_thread = (self.recursive_apply_level == 0 | |
| 1096 and self.sequential_caller_id == -1) | |
| 1097 | |
| 1098 # We don't honor the fail_on_error flag in the case of multiple threads | |
| 1099 # or processes. | |
| 1100 fail_on_error = fail_on_error and (process_count * thread_count == 1) | |
| 1101 | |
| 1102 # Only check this from the first call in the main thread. Apart from the | |
| 1103 # fact that it's wasteful to try this multiple times in general, it also | |
| 1104 # will never work when called from a subprocess since we use daemon | |
| 1105 # processes, and daemons can't create other processes. | |
| 1106 if is_main_thread: | |
| 1107 if ((not self.multiprocessing_is_available) | |
| 1108 and thread_count * process_count > 1): | |
| 1109 # Run the check again and log the appropriate warnings. This was run | |
| 1110 # before, when the Command object was created, in order to calculate | |
| 1111 # self.multiprocessing_is_available, but we don't want to print the | |
| 1112 # warning until we're sure the user actually tried to use multiple | |
| 1113 # threads or processes. | |
| 1114 MultiprocessingIsAvailable(logger=self.logger) | |
| 1115 | |
| 1116 if self.multiprocessing_is_available: | |
| 1117 caller_id = self._SetUpPerCallerState() | |
| 1118 else: | |
| 1119 self.sequential_caller_id += 1 | |
| 1120 caller_id = self.sequential_caller_id | |
| 1121 | |
| 1122 if is_main_thread: | |
| 1123 # pylint: disable=global-variable-undefined | |
| 1124 global global_return_values_map, shared_vars_map, failure_count | |
| 1125 global caller_id_finished_count, shared_vars_list_map | |
| 1126 global_return_values_map = BasicIncrementDict() | |
| 1127 global_return_values_map.Put(caller_id, []) | |
| 1128 shared_vars_map = BasicIncrementDict() | |
| 1129 caller_id_finished_count = BasicIncrementDict() | |
| 1130 shared_vars_list_map = {} | |
| 1131 failure_count = 0 | |
| 1132 | |
| 1133 # If any shared attributes passed by caller, create a dictionary of | |
| 1134 # shared memory variables for every element in the list of shared | |
| 1135 # attributes. | |
| 1136 if shared_attrs: | |
| 1137 shared_vars_list_map[caller_id] = shared_attrs | |
| 1138 for name in shared_attrs: | |
| 1139 shared_vars_map.Put((caller_id, name), 0) | |
| 1140 | |
| 1141 # Make all of the requested function calls. | |
| 1142 if self.multiprocessing_is_available and thread_count * process_count > 1: | |
| 1143 self._ParallelApply(func, args_iterator, exception_handler, caller_id, | |
| 1144 arg_checker, process_count, thread_count, | |
| 1145 should_return_results, fail_on_error) | |
| 1146 else: | |
| 1147 self._SequentialApply(func, args_iterator, exception_handler, caller_id, | |
| 1148 arg_checker, should_return_results, fail_on_error) | |
| 1149 | |
| 1150 if shared_attrs: | |
| 1151 for name in shared_attrs: | |
| 1152 # This allows us to retain the original value of the shared variable, | |
| 1153 # and simply apply the delta after what was done during the call to | |
| 1154 # apply. | |
| 1155 final_value = (original_shared_vars_values[name] + | |
| 1156 shared_vars_map.Get((caller_id, name))) | |
| 1157 setattr(self, name, final_value) | |
| 1158 | |
| 1159 if should_return_results: | |
| 1160 return global_return_values_map.Get(caller_id) | |
| 1161 | |
| 1162 def _MaybeSuggestGsutilDashM(self): | |
| 1163 """Outputs a sugestion to the user to use gsutil -m.""" | |
| 1164 if not (boto.config.getint('GSUtil', 'parallel_process_count', 0) == 1 and | |
| 1165 boto.config.getint('GSUtil', 'parallel_thread_count', 0) == 1): | |
| 1166 self.logger.info('\n' + textwrap.fill( | |
| 1167 '==> NOTE: You are performing a sequence of gsutil operations that ' | |
| 1168 'may run significantly faster if you instead use gsutil -m %s ...\n' | |
| 1169 'Please see the -m section under "gsutil help options" for further ' | |
| 1170 'information about when gsutil -m can be advantageous.' | |
| 1171 % sys.argv[1]) + '\n') | |
| 1172 | |
| 1173 # pylint: disable=g-doc-args | |
| 1174 def _SequentialApply(self, func, args_iterator, exception_handler, caller_id, | |
| 1175 arg_checker, should_return_results, fail_on_error): | |
| 1176 """Performs all function calls sequentially in the current thread. | |
| 1177 | |
| 1178 No other threads or processes will be spawned. This degraded functionality | |
| 1179 is used when the multiprocessing module is not available or the user | |
| 1180 requests only one thread and one process. | |
| 1181 """ | |
| 1182 # Create a WorkerThread to handle all of the logic needed to actually call | |
| 1183 # the function. Note that this thread will never be started, and all work | |
| 1184 # is done in the current thread. | |
| 1185 worker_thread = WorkerThread(None, False) | |
| 1186 args_iterator = iter(args_iterator) | |
| 1187 # Count of sequential calls that have been made. Used for producing | |
| 1188 # suggestion to use gsutil -m. | |
| 1189 sequential_call_count = 0 | |
| 1190 while True: | |
| 1191 | |
| 1192 # Try to get the next argument, handling any exceptions that arise. | |
| 1193 try: | |
| 1194 args = args_iterator.next() | |
| 1195 except StopIteration, e: | |
| 1196 break | |
| 1197 except Exception, e: # pylint: disable=broad-except | |
| 1198 _IncrementFailureCount() | |
| 1199 if fail_on_error: | |
| 1200 raise | |
| 1201 else: | |
| 1202 try: | |
| 1203 exception_handler(self, e) | |
| 1204 except Exception, _: # pylint: disable=broad-except | |
| 1205 self.logger.debug( | |
| 1206 'Caught exception while handling exception for %s:\n%s', | |
| 1207 func, traceback.format_exc()) | |
| 1208 continue | |
| 1209 | |
| 1210 sequential_call_count += 1 | |
| 1211 if sequential_call_count == OFFER_GSUTIL_M_SUGGESTION_THRESHOLD: | |
| 1212 # Output suggestion near beginning of run, so user sees it early and can | |
| 1213 # ^C and try gsutil -m. | |
| 1214 self._MaybeSuggestGsutilDashM() | |
| 1215 if arg_checker(self, args): | |
| 1216 # Now that we actually have the next argument, perform the task. | |
| 1217 task = Task(func, args, caller_id, exception_handler, | |
| 1218 should_return_results, arg_checker, fail_on_error) | |
| 1219 worker_thread.PerformTask(task, self) | |
| 1220 if sequential_call_count >= gslib.util.GetTermLines(): | |
| 1221 # Output suggestion at end of long run, in case user missed it at the | |
| 1222 # start and it scrolled off-screen. | |
| 1223 self._MaybeSuggestGsutilDashM() | |
| 1224 | |
| 1225 # pylint: disable=g-doc-args | |
| 1226 def _ParallelApply(self, func, args_iterator, exception_handler, caller_id, | |
| 1227 arg_checker, process_count, thread_count, | |
| 1228 should_return_results, fail_on_error): | |
| 1229 """Dispatches input arguments across a thread/process pool. | |
| 1230 | |
| 1231 Pools are composed of parallel OS processes and/or Python threads, | |
| 1232 based on options (-m or not) and settings in the user's config file. | |
| 1233 | |
| 1234 If only one OS process is requested/available, dispatch requests across | |
| 1235 threads in the current OS process. | |
| 1236 | |
| 1237 In the multi-process case, we will create one pool of worker processes for | |
| 1238 each level of the tree of recursive calls to Apply. E.g., if A calls | |
| 1239 Apply(B), and B ultimately calls Apply(C) followed by Apply(D), then we | |
| 1240 will only create two sets of worker processes - B will execute in the first, | |
| 1241 and C and D will execute in the second. If C is then changed to call | |
| 1242 Apply(E) and D is changed to call Apply(F), then we will automatically | |
| 1243 create a third set of processes (lazily, when needed) that will be used to | |
| 1244 execute calls to E and F. This might look something like: | |
| 1245 | |
| 1246 Pool1 Executes: B | |
| 1247 / \ | |
| 1248 Pool2 Executes: C D | |
| 1249 / \ | |
| 1250 Pool3 Executes: E F | |
| 1251 | |
| 1252 Apply's parallelism is generally broken up into 4 cases: | |
| 1253 - If process_count == thread_count == 1, then all tasks will be executed | |
| 1254 by _SequentialApply. | |
| 1255 - If process_count > 1 and thread_count == 1, then the main thread will | |
| 1256 create a new pool of processes (if they don't already exist) and each of | |
| 1257 those processes will execute the tasks in a single thread. | |
| 1258 - If process_count == 1 and thread_count > 1, then this process will create | |
| 1259 a new pool of threads to execute the tasks. | |
| 1260 - If process_count > 1 and thread_count > 1, then the main thread will | |
| 1261 create a new pool of processes (if they don't already exist) and each of | |
| 1262 those processes will, upon creation, create a pool of threads to | |
| 1263 execute the tasks. | |
| 1264 | |
| 1265 Args: | |
| 1266 caller_id: The caller ID unique to this call to command.Apply. | |
| 1267 See command.Apply for description of other arguments. | |
| 1268 """ | |
| 1269 is_main_thread = self.recursive_apply_level == 0 | |
| 1270 | |
| 1271 # Catch SIGINT and SIGTERM under Linux/MacOs so we can do cleanup before | |
| 1272 # exiting. | |
| 1273 if not IS_WINDOWS and is_main_thread: | |
| 1274 # Register as a final signal handler because this handler kills the | |
| 1275 # main gsutil process (so it must run last). | |
| 1276 RegisterSignalHandler(signal.SIGINT, self._HandleMultiProcessingSigs, | |
| 1277 is_final_handler=True) | |
| 1278 RegisterSignalHandler(signal.SIGTERM, self._HandleMultiProcessingSigs, | |
| 1279 is_final_handler=True) | |
| 1280 | |
| 1281 if not task_queues: | |
| 1282 # The process we create will need to access the next recursive level | |
| 1283 # of task queues if it makes a call to Apply, so we always keep around | |
| 1284 # one more queue than we know we need. OTOH, if we don't create a new | |
| 1285 # process, the existing process still needs a task queue to use. | |
| 1286 task_queues.append(_NewMultiprocessingQueue()) | |
| 1287 | |
| 1288 if process_count > 1: # Handle process pool creation. | |
| 1289 # Check whether this call will need a new set of workers. | |
| 1290 | |
| 1291 # Each worker must acquire a shared lock before notifying the main thread | |
| 1292 # that it needs a new worker pool, so that at most one worker asks for | |
| 1293 # a new worker pool at once. | |
| 1294 try: | |
| 1295 if not is_main_thread: | |
| 1296 worker_checking_level_lock.acquire() | |
| 1297 if self.recursive_apply_level >= current_max_recursive_level.value: | |
| 1298 with need_pool_or_done_cond: | |
| 1299 # Only the main thread is allowed to create new processes - | |
| 1300 # otherwise, we will run into some Python bugs. | |
| 1301 if is_main_thread: | |
| 1302 self._CreateNewConsumerPool(process_count, thread_count) | |
| 1303 else: | |
| 1304 # Notify the main thread that we need a new consumer pool. | |
| 1305 new_pool_needed.value = 1 | |
| 1306 need_pool_or_done_cond.notify_all() | |
| 1307 # The main thread will notify us when it finishes. | |
| 1308 need_pool_or_done_cond.wait() | |
| 1309 finally: | |
| 1310 if not is_main_thread: | |
| 1311 worker_checking_level_lock.release() | |
| 1312 | |
| 1313 # If we're running in this process, create a separate task queue. Otherwise, | |
| 1314 # if Apply has already been called with process_count > 1, then there will | |
| 1315 # be consumer pools trying to use our processes. | |
| 1316 if process_count > 1: | |
| 1317 task_queue = task_queues[self.recursive_apply_level] | |
| 1318 else: | |
| 1319 task_queue = _NewMultiprocessingQueue() | |
| 1320 | |
| 1321 # Kick off a producer thread to throw tasks in the global task queue. We | |
| 1322 # do this asynchronously so that the main thread can be free to create new | |
| 1323 # consumer pools when needed (otherwise, any thread with a task that needs | |
| 1324 # a new consumer pool must block until we're completely done producing; in | |
| 1325 # the worst case, every worker blocks on such a call and the producer fills | |
| 1326 # up the task queue before it finishes, so we block forever). | |
| 1327 producer_thread = ProducerThread(copy.copy(self), args_iterator, caller_id, | |
| 1328 func, task_queue, should_return_results, | |
| 1329 exception_handler, arg_checker, | |
| 1330 fail_on_error) | |
| 1331 | |
| 1332 if process_count > 1: | |
| 1333 # Wait here until either: | |
| 1334 # 1. We're the main thread and someone needs a new consumer pool - in | |
| 1335 # which case we create one and continue waiting. | |
| 1336 # 2. Someone notifies us that all of the work we requested is done, in | |
| 1337 # which case we retrieve the results (if applicable) and stop | |
| 1338 # waiting. | |
| 1339 while True: | |
| 1340 with need_pool_or_done_cond: | |
| 1341 # Either our call is done, or someone needs a new level of consumer | |
| 1342 # pools, or we the wakeup call was meant for someone else. It's | |
| 1343 # impossible for both conditions to be true, since the main thread is | |
| 1344 # blocked on any other ongoing calls to Apply, and a thread would not | |
| 1345 # ask for a new consumer pool unless it had more work to do. | |
| 1346 if call_completed_map[caller_id]: | |
| 1347 break | |
| 1348 elif is_main_thread and new_pool_needed.value: | |
| 1349 new_pool_needed.value = 0 | |
| 1350 self._CreateNewConsumerPool(process_count, thread_count) | |
| 1351 need_pool_or_done_cond.notify_all() | |
| 1352 | |
| 1353 # Note that we must check the above conditions before the wait() call; | |
| 1354 # otherwise, the notification can happen before we start waiting, in | |
| 1355 # which case we'll block forever. | |
| 1356 need_pool_or_done_cond.wait() | |
| 1357 else: # Using a single process. | |
| 1358 self._ApplyThreads(thread_count, process_count, | |
| 1359 self.recursive_apply_level, | |
| 1360 is_blocking_call=True, task_queue=task_queue) | |
| 1361 | |
| 1362 # We encountered an exception from the producer thread before any arguments | |
| 1363 # were enqueued, but it wouldn't have been propagated, so we'll now | |
| 1364 # explicitly raise it here. | |
| 1365 if producer_thread.unknown_exception: | |
| 1366 # pylint: disable=raising-bad-type | |
| 1367 raise producer_thread.unknown_exception | |
| 1368 | |
| 1369 # We encountered an exception from the producer thread while iterating over | |
| 1370 # the arguments, so raise it here if we're meant to fail on error. | |
| 1371 if producer_thread.iterator_exception and fail_on_error: | |
| 1372 # pylint: disable=raising-bad-type | |
| 1373 raise producer_thread.iterator_exception | |
| 1374 | |
| 1375 def _ApplyThreads(self, thread_count, process_count, recursive_apply_level, | |
| 1376 is_blocking_call=False, task_queue=None): | |
| 1377 """Assigns the work from the multi-process global task queue. | |
| 1378 | |
| 1379 Work is assigned to an individual process for later consumption either by | |
| 1380 the WorkerThreads or (if thread_count == 1) this thread. | |
| 1381 | |
| 1382 Args: | |
| 1383 thread_count: The number of threads used to perform the work. If 1, then | |
| 1384 perform all work in this thread. | |
| 1385 process_count: The number of processes used to perform the work. | |
| 1386 recursive_apply_level: The depth in the tree of recursive calls to Apply | |
| 1387 of this thread. | |
| 1388 is_blocking_call: True iff the call to Apply is blocked on this call | |
| 1389 (which is true iff process_count == 1), implying that | |
| 1390 _ApplyThreads must behave as a blocking call. | |
| 1391 """ | |
| 1392 self._ResetConnectionPool() | |
| 1393 self.recursive_apply_level = recursive_apply_level | |
| 1394 | |
| 1395 task_queue = task_queue or task_queues[recursive_apply_level] | |
| 1396 | |
| 1397 assert thread_count * process_count > 1, ( | |
| 1398 'Invalid state, calling command._ApplyThreads with only one thread ' | |
| 1399 'and process.') | |
| 1400 worker_pool = WorkerPool( | |
| 1401 thread_count, self.logger, | |
| 1402 bucket_storage_uri_class=self.bucket_storage_uri_class, | |
| 1403 gsutil_api_map=self.gsutil_api_map, debug=self.debug) | |
| 1404 | |
| 1405 num_enqueued = 0 | |
| 1406 while True: | |
| 1407 task = task_queue.get() | |
| 1408 if task.args != ZERO_TASKS_TO_DO_ARGUMENT: | |
| 1409 # If we have no tasks to do and we're performing a blocking call, we | |
| 1410 # need a special signal to tell us to stop - otherwise, we block on | |
| 1411 # the call to task_queue.get() forever. | |
| 1412 worker_pool.AddTask(task) | |
| 1413 num_enqueued += 1 | |
| 1414 | |
| 1415 if is_blocking_call: | |
| 1416 num_to_do = total_tasks[task.caller_id] | |
| 1417 # The producer thread won't enqueue the last task until after it has | |
| 1418 # updated total_tasks[caller_id], so we know that num_to_do < 0 implies | |
| 1419 # we will do this check again. | |
| 1420 if num_to_do >= 0 and num_enqueued == num_to_do: | |
| 1421 if thread_count == 1: | |
| 1422 return | |
| 1423 else: | |
| 1424 while True: | |
| 1425 with need_pool_or_done_cond: | |
| 1426 if call_completed_map[task.caller_id]: | |
| 1427 # We need to check this first, in case the condition was | |
| 1428 # notified before we grabbed the lock. | |
| 1429 return | |
| 1430 need_pool_or_done_cond.wait() | |
| 1431 | |
| 1432 | |
| 1433 # Below here lie classes and functions related to controlling the flow of tasks | |
| 1434 # between various threads and processes. | |
| 1435 | |
| 1436 | |
| 1437 class _ConsumerPool(object): | |
| 1438 | |
| 1439 def __init__(self, processes, task_queue): | |
| 1440 self.processes = processes | |
| 1441 self.task_queue = task_queue | |
| 1442 | |
| 1443 def ShutDown(self): | |
| 1444 for process in self.processes: | |
| 1445 KillProcess(process.pid) | |
| 1446 | |
| 1447 | |
| 1448 def KillProcess(pid): | |
| 1449 """Make best effort to kill the given process. | |
| 1450 | |
| 1451 We ignore all exceptions so a caller looping through a list of processes will | |
| 1452 continue attempting to kill each, even if one encounters a problem. | |
| 1453 | |
| 1454 Args: | |
| 1455 pid: The process ID. | |
| 1456 """ | |
| 1457 try: | |
| 1458 # os.kill doesn't work in 2.X or 3.Y on Windows for any X < 7 or Y < 2. | |
| 1459 if IS_WINDOWS and ((2, 6) <= sys.version_info[:3] < (2, 7) or | |
| 1460 (3, 0) <= sys.version_info[:3] < (3, 2)): | |
| 1461 kernel32 = ctypes.windll.kernel32 | |
| 1462 handle = kernel32.OpenProcess(1, 0, pid) | |
| 1463 kernel32.TerminateProcess(handle, 0) | |
| 1464 else: | |
| 1465 os.kill(pid, signal.SIGKILL) | |
| 1466 except: # pylint: disable=bare-except | |
| 1467 pass | |
| 1468 | |
| 1469 | |
| 1470 class Task(namedtuple('Task', ( | |
| 1471 'func args caller_id exception_handler should_return_results arg_checker ' | |
| 1472 'fail_on_error'))): | |
| 1473 """Task class representing work to be completed. | |
| 1474 | |
| 1475 Args: | |
| 1476 func: The function to be executed. | |
| 1477 args: The arguments to func. | |
| 1478 caller_id: The globally-unique caller ID corresponding to the Apply call. | |
| 1479 exception_handler: The exception handler to use if the call to func fails. | |
| 1480 should_return_results: True iff the results of this function should be | |
| 1481 returned from the Apply call. | |
| 1482 arg_checker: Used to determine whether we should process the current | |
| 1483 argument or simply skip it. Also handles any logging that | |
| 1484 is specific to a particular type of argument. | |
| 1485 fail_on_error: If true, then raise any exceptions encountered when | |
| 1486 executing func. This is only applicable in the case of | |
| 1487 process_count == thread_count == 1. | |
| 1488 """ | |
| 1489 pass | |
| 1490 | |
| 1491 | |
| 1492 class ProducerThread(threading.Thread): | |
| 1493 """Thread used to enqueue work for other processes and threads.""" | |
| 1494 | |
| 1495 def __init__(self, cls, args_iterator, caller_id, func, task_queue, | |
| 1496 should_return_results, exception_handler, arg_checker, | |
| 1497 fail_on_error): | |
| 1498 """Initializes the producer thread. | |
| 1499 | |
| 1500 Args: | |
| 1501 cls: Instance of Command for which this ProducerThread was created. | |
| 1502 args_iterator: Iterable collection of arguments to be put into the | |
| 1503 work queue. | |
| 1504 caller_id: Globally-unique caller ID corresponding to this call to Apply. | |
| 1505 func: The function to be called on each element of args_iterator. | |
| 1506 task_queue: The queue into which tasks will be put, to later be consumed | |
| 1507 by Command._ApplyThreads. | |
| 1508 should_return_results: True iff the results for this call to command.Apply | |
| 1509 were requested. | |
| 1510 exception_handler: The exception handler to use when errors are | |
| 1511 encountered during calls to func. | |
| 1512 arg_checker: Used to determine whether we should process the current | |
| 1513 argument or simply skip it. Also handles any logging that | |
| 1514 is specific to a particular type of argument. | |
| 1515 fail_on_error: If true, then raise any exceptions encountered when | |
| 1516 executing func. This is only applicable in the case of | |
| 1517 process_count == thread_count == 1. | |
| 1518 """ | |
| 1519 super(ProducerThread, self).__init__() | |
| 1520 self.func = func | |
| 1521 self.cls = cls | |
| 1522 self.args_iterator = args_iterator | |
| 1523 self.caller_id = caller_id | |
| 1524 self.task_queue = task_queue | |
| 1525 self.arg_checker = arg_checker | |
| 1526 self.exception_handler = exception_handler | |
| 1527 self.should_return_results = should_return_results | |
| 1528 self.fail_on_error = fail_on_error | |
| 1529 self.shared_variables_updater = _SharedVariablesUpdater() | |
| 1530 self.daemon = True | |
| 1531 self.unknown_exception = None | |
| 1532 self.iterator_exception = None | |
| 1533 self.start() | |
| 1534 | |
| 1535 def run(self): | |
| 1536 num_tasks = 0 | |
| 1537 cur_task = None | |
| 1538 last_task = None | |
| 1539 try: | |
| 1540 args_iterator = iter(self.args_iterator) | |
| 1541 while True: | |
| 1542 try: | |
| 1543 args = args_iterator.next() | |
| 1544 except StopIteration, e: | |
| 1545 break | |
| 1546 except Exception, e: # pylint: disable=broad-except | |
| 1547 _IncrementFailureCount() | |
| 1548 if self.fail_on_error: | |
| 1549 self.iterator_exception = e | |
| 1550 raise | |
| 1551 else: | |
| 1552 try: | |
| 1553 self.exception_handler(self.cls, e) | |
| 1554 except Exception, _: # pylint: disable=broad-except | |
| 1555 self.cls.logger.debug( | |
| 1556 'Caught exception while handling exception for %s:\n%s', | |
| 1557 self.func, traceback.format_exc()) | |
| 1558 self.shared_variables_updater.Update(self.caller_id, self.cls) | |
| 1559 continue | |
| 1560 | |
| 1561 if self.arg_checker(self.cls, args): | |
| 1562 num_tasks += 1 | |
| 1563 last_task = cur_task | |
| 1564 cur_task = Task(self.func, args, self.caller_id, | |
| 1565 self.exception_handler, self.should_return_results, | |
| 1566 self.arg_checker, self.fail_on_error) | |
| 1567 if last_task: | |
| 1568 self.task_queue.put(last_task) | |
| 1569 except Exception, e: # pylint: disable=broad-except | |
| 1570 # This will also catch any exception raised due to an error in the | |
| 1571 # iterator when fail_on_error is set, so check that we failed for some | |
| 1572 # other reason before claiming that we had an unknown exception. | |
| 1573 if not self.iterator_exception: | |
| 1574 self.unknown_exception = e | |
| 1575 finally: | |
| 1576 # We need to make sure to update total_tasks[caller_id] before we enqueue | |
| 1577 # the last task. Otherwise, a worker can retrieve the last task and | |
| 1578 # complete it, then check total_tasks and determine that we're not done | |
| 1579 # producing all before we update total_tasks. This approach forces workers | |
| 1580 # to wait on the last task until after we've updated total_tasks. | |
| 1581 total_tasks[self.caller_id] = num_tasks | |
| 1582 if not cur_task: | |
| 1583 # This happens if there were zero arguments to be put in the queue. | |
| 1584 cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id, | |
| 1585 None, None, None, None) | |
| 1586 self.task_queue.put(cur_task) | |
| 1587 | |
| 1588 # It's possible that the workers finished before we updated total_tasks, | |
| 1589 # so we need to check here as well. | |
| 1590 _NotifyIfDone(self.caller_id, | |
| 1591 caller_id_finished_count.Get(self.caller_id)) | |
| 1592 | |
| 1593 | |
| 1594 class WorkerPool(object): | |
| 1595 """Pool of worker threads to which tasks can be added.""" | |
| 1596 | |
| 1597 def __init__(self, thread_count, logger, bucket_storage_uri_class=None, | |
| 1598 gsutil_api_map=None, debug=0): | |
| 1599 self.task_queue = _NewThreadsafeQueue() | |
| 1600 self.threads = [] | |
| 1601 for _ in range(thread_count): | |
| 1602 worker_thread = WorkerThread( | |
| 1603 self.task_queue, logger, | |
| 1604 bucket_storage_uri_class=bucket_storage_uri_class, | |
| 1605 gsutil_api_map=gsutil_api_map, debug=debug) | |
| 1606 self.threads.append(worker_thread) | |
| 1607 worker_thread.start() | |
| 1608 | |
| 1609 def AddTask(self, task): | |
| 1610 self.task_queue.put(task) | |
| 1611 | |
| 1612 | |
| 1613 class WorkerThread(threading.Thread): | |
| 1614 """Thread where all the work will be performed. | |
| 1615 | |
| 1616 This makes the function calls for Apply and takes care of all error handling, | |
| 1617 return value propagation, and shared_vars. | |
| 1618 | |
| 1619 Note that this thread is NOT started upon instantiation because the function- | |
| 1620 calling logic is also used in the single-threaded case. | |
| 1621 """ | |
| 1622 | |
| 1623 def __init__(self, task_queue, logger, bucket_storage_uri_class=None, | |
| 1624 gsutil_api_map=None, debug=0): | |
| 1625 """Initializes the worker thread. | |
| 1626 | |
| 1627 Args: | |
| 1628 task_queue: The thread-safe queue from which this thread should obtain | |
| 1629 its work. | |
| 1630 logger: Logger to use for this thread. | |
| 1631 bucket_storage_uri_class: Class to instantiate for cloud StorageUris. | |
| 1632 Settable for testing/mocking. | |
| 1633 gsutil_api_map: Map of providers and API selector tuples to api classes | |
| 1634 which can be used to communicate with those providers. | |
| 1635 Used for the instantiating CloudApiDelegator class. | |
| 1636 debug: debug level for the CloudApiDelegator class. | |
| 1637 """ | |
| 1638 super(WorkerThread, self).__init__() | |
| 1639 self.task_queue = task_queue | |
| 1640 self.daemon = True | |
| 1641 self.cached_classes = {} | |
| 1642 self.shared_vars_updater = _SharedVariablesUpdater() | |
| 1643 | |
| 1644 self.thread_gsutil_api = None | |
| 1645 if bucket_storage_uri_class and gsutil_api_map: | |
| 1646 self.thread_gsutil_api = CloudApiDelegator( | |
| 1647 bucket_storage_uri_class, gsutil_api_map, logger, debug=debug) | |
| 1648 | |
| 1649 def PerformTask(self, task, cls): | |
| 1650 """Makes the function call for a task. | |
| 1651 | |
| 1652 Args: | |
| 1653 task: The Task to perform. | |
| 1654 cls: The instance of a class which gives context to the functions called | |
| 1655 by the Task's function. E.g., see SetAclFuncWrapper. | |
| 1656 """ | |
| 1657 caller_id = task.caller_id | |
| 1658 try: | |
| 1659 results = task.func(cls, task.args, thread_state=self.thread_gsutil_api) | |
| 1660 if task.should_return_results: | |
| 1661 global_return_values_map.Update(caller_id, [results], default_value=[]) | |
| 1662 except Exception, e: # pylint: disable=broad-except | |
| 1663 _IncrementFailureCount() | |
| 1664 if task.fail_on_error: | |
| 1665 raise # Only happens for single thread and process case. | |
| 1666 else: | |
| 1667 try: | |
| 1668 task.exception_handler(cls, e) | |
| 1669 except Exception, _: # pylint: disable=broad-except | |
| 1670 # Don't allow callers to raise exceptions here and kill the worker | |
| 1671 # threads. | |
| 1672 cls.logger.debug( | |
| 1673 'Caught exception while handling exception for %s:\n%s', | |
| 1674 task, traceback.format_exc()) | |
| 1675 finally: | |
| 1676 self.shared_vars_updater.Update(caller_id, cls) | |
| 1677 | |
| 1678 # Even if we encounter an exception, we still need to claim that that | |
| 1679 # the function finished executing. Otherwise, we won't know when to | |
| 1680 # stop waiting and return results. | |
| 1681 num_done = caller_id_finished_count.Update(caller_id, 1) | |
| 1682 | |
| 1683 if cls.multiprocessing_is_available: | |
| 1684 _NotifyIfDone(caller_id, num_done) | |
| 1685 | |
| 1686 def run(self): | |
| 1687 while True: | |
| 1688 task = self.task_queue.get() | |
| 1689 caller_id = task.caller_id | |
| 1690 | |
| 1691 # Get the instance of the command with the appropriate context. | |
| 1692 cls = self.cached_classes.get(caller_id, None) | |
| 1693 if not cls: | |
| 1694 cls = copy.copy(class_map[caller_id]) | |
| 1695 cls.logger = CreateGsutilLogger(cls.command_name) | |
| 1696 self.cached_classes[caller_id] = cls | |
| 1697 | |
| 1698 self.PerformTask(task, cls) | |
| 1699 | |
| 1700 | |
| 1701 class _SharedVariablesUpdater(object): | |
| 1702 """Used to update shared variable for a class in the global map. | |
| 1703 | |
| 1704 Note that each thread will have its own instance of the calling class for | |
| 1705 context, and it will also have its own instance of a | |
| 1706 _SharedVariablesUpdater. This is used in the following way: | |
| 1707 | |
| 1708 1. Before any tasks are performed, each thread will get a copy of the | |
| 1709 calling class, and the globally-consistent value of this shared variable | |
| 1710 will be initialized to whatever it was before the call to Apply began. | |
| 1711 | |
| 1712 2. After each time a thread performs a task, it will look at the current | |
| 1713 values of the shared variables in its instance of the calling class. | |
| 1714 | |
| 1715 2.A. For each such variable, it computes the delta of this variable | |
| 1716 between the last known value for this class (which is stored in | |
| 1717 a dict local to this class) and the current value of the variable | |
| 1718 in the class. | |
| 1719 | |
| 1720 2.B. Using this delta, we update the last known value locally as well | |
| 1721 as the globally-consistent value shared across all classes (the | |
| 1722 globally consistent value is simply increased by the computed | |
| 1723 delta). | |
| 1724 """ | |
| 1725 | |
| 1726 def __init__(self): | |
| 1727 self.last_shared_var_values = {} | |
| 1728 | |
| 1729 def Update(self, caller_id, cls): | |
| 1730 """Update any shared variables with their deltas.""" | |
| 1731 shared_vars = shared_vars_list_map.get(caller_id, None) | |
| 1732 if shared_vars: | |
| 1733 for name in shared_vars: | |
| 1734 key = (caller_id, name) | |
| 1735 last_value = self.last_shared_var_values.get(key, 0) | |
| 1736 # Compute the change made since the last time we updated here. This is | |
| 1737 # calculated by simply subtracting the last known value from the current | |
| 1738 # value in the class instance. | |
| 1739 delta = getattr(cls, name) - last_value | |
| 1740 self.last_shared_var_values[key] = delta + last_value | |
| 1741 | |
| 1742 # Update the globally-consistent value by simply increasing it by the | |
| 1743 # computed delta. | |
| 1744 shared_vars_map.Update(key, delta) | |
| 1745 | |
| 1746 | |
| 1747 def _NotifyIfDone(caller_id, num_done): | |
| 1748 """Notify any threads waiting for results that something has finished. | |
| 1749 | |
| 1750 Each waiting thread will then need to check the call_completed_map to see if | |
| 1751 its work is done. | |
| 1752 | |
| 1753 Note that num_done could be calculated here, but it is passed in as an | |
| 1754 optimization so that we have one less call to a globally-locked data | |
| 1755 structure. | |
| 1756 | |
| 1757 Args: | |
| 1758 caller_id: The caller_id of the function whose progress we're checking. | |
| 1759 num_done: The number of tasks currently completed for that caller_id. | |
| 1760 """ | |
| 1761 num_to_do = total_tasks[caller_id] | |
| 1762 if num_to_do == num_done and num_to_do >= 0: | |
| 1763 # Notify the Apply call that's sleeping that it's ready to return. | |
| 1764 with need_pool_or_done_cond: | |
| 1765 call_completed_map[caller_id] = True | |
| 1766 need_pool_or_done_cond.notify_all() | |
| 1767 | |
| 1768 | |
| 1769 def ShutDownGsutil(): | |
| 1770 """Shut down all processes in consumer pools in preparation for exiting.""" | |
| 1771 for q in queues: | |
| 1772 try: | |
| 1773 q.cancel_join_thread() | |
| 1774 except: # pylint: disable=bare-except | |
| 1775 pass | |
| 1776 for consumer_pool in consumer_pools: | |
| 1777 consumer_pool.ShutDown() | |
| 1778 | |
| 1779 | |
| 1780 # pylint: disable=global-variable-undefined | |
| 1781 def _IncrementFailureCount(): | |
| 1782 global failure_count | |
| 1783 if isinstance(failure_count, int): | |
| 1784 failure_count += 1 | |
| 1785 else: # Otherwise it's a multiprocessing.Value() of type 'i'. | |
| 1786 failure_count.value += 1 | |
| 1787 | |
| 1788 | |
| 1789 # pylint: disable=global-variable-undefined | |
| 1790 def GetFailureCount(): | |
| 1791 """Returns the number of failures processed during calls to Apply().""" | |
| 1792 try: | |
| 1793 if isinstance(failure_count, int): | |
| 1794 return failure_count | |
| 1795 else: # It's a multiprocessing.Value() of type 'i'. | |
| 1796 return failure_count.value | |
| 1797 except NameError: # If it wasn't initialized, Apply() wasn't called. | |
| 1798 return 0 | |
| 1799 | |
| 1800 | |
| 1801 def ResetFailureCount(): | |
| 1802 """Resets the failure_count variable to 0 - useful if error is expected.""" | |
| 1803 try: | |
| 1804 global failure_count | |
| 1805 if isinstance(failure_count, int): | |
| 1806 failure_count = 0 | |
| 1807 else: # It's a multiprocessing.Value() of type 'i'. | |
| 1808 failure_count = multiprocessing.Value('i', 0) | |
| 1809 except NameError: # If it wasn't initialized, Apply() wasn't called. | |
| 1810 pass | |
| OLD | NEW |