| Index: third_party/gsutil/gslib/command.py
|
| diff --git a/third_party/gsutil/gslib/command.py b/third_party/gsutil/gslib/command.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..3823b91505696bf08f8c3edd03d4af061a450575
|
| --- /dev/null
|
| +++ b/third_party/gsutil/gslib/command.py
|
| @@ -0,0 +1,1810 @@
|
| +# -*- coding: utf-8 -*-
|
| +# Copyright 2010 Google Inc. All Rights Reserved.
|
| +#
|
| +# Licensed under the Apache License, Version 2.0 (the "License");
|
| +# you may not use this file except in compliance with the License.
|
| +# You may obtain a copy of the License at
|
| +#
|
| +# http://www.apache.org/licenses/LICENSE-2.0
|
| +#
|
| +# Unless required by applicable law or agreed to in writing, software
|
| +# distributed under the License is distributed on an "AS IS" BASIS,
|
| +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| +# See the License for the specific language governing permissions and
|
| +# limitations under the License.
|
| +"""Base class for gsutil commands.
|
| +
|
| +In addition to base class code, this file contains helpers that depend on base
|
| +class state (such as GetAndPrintAcl) In general, functions that depend on
|
| +class state and that are used by multiple commands belong in this file.
|
| +Functions that don't depend on class state belong in util.py, and non-shared
|
| +helpers belong in individual subclasses.
|
| +"""
|
| +
|
| +from __future__ import absolute_import
|
| +
|
| +import codecs
|
| +from collections import namedtuple
|
| +import copy
|
| +import getopt
|
| +import logging
|
| +import multiprocessing
|
| +import os
|
| +import Queue
|
| +import signal
|
| +import sys
|
| +import textwrap
|
| +import threading
|
| +import traceback
|
| +
|
| +import boto
|
| +from boto.storage_uri import StorageUri
|
| +import gslib
|
| +from gslib.cloud_api import AccessDeniedException
|
| +from gslib.cloud_api import ArgumentException
|
| +from gslib.cloud_api import ServiceException
|
| +from gslib.cloud_api_delegator import CloudApiDelegator
|
| +from gslib.cs_api_map import ApiSelector
|
| +from gslib.cs_api_map import GsutilApiMapFactory
|
| +from gslib.exception import CommandException
|
| +from gslib.help_provider import HelpProvider
|
| +from gslib.name_expansion import NameExpansionIterator
|
| +from gslib.name_expansion import NameExpansionResult
|
| +from gslib.parallelism_framework_util import AtomicIncrementDict
|
| +from gslib.parallelism_framework_util import BasicIncrementDict
|
| +from gslib.parallelism_framework_util import ThreadAndProcessSafeDict
|
| +from gslib.plurality_checkable_iterator import PluralityCheckableIterator
|
| +from gslib.sig_handling import RegisterSignalHandler
|
| +from gslib.storage_url import StorageUrlFromString
|
| +from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
|
| +from gslib.translation_helper import AclTranslation
|
| +from gslib.util import GetConfigFilePath
|
| +from gslib.util import GsutilStreamHandler
|
| +from gslib.util import HaveFileUrls
|
| +from gslib.util import HaveProviderUrls
|
| +from gslib.util import IS_WINDOWS
|
| +from gslib.util import MultiprocessingIsAvailable
|
| +from gslib.util import NO_MAX
|
| +from gslib.util import UrlsAreForSingleProvider
|
| +from gslib.util import UTF8
|
| +from gslib.wildcard_iterator import CreateWildcardIterator
|
| +
|
| +OFFER_GSUTIL_M_SUGGESTION_THRESHOLD = 5
|
| +
|
| +if IS_WINDOWS:
|
| + import ctypes # pylint: disable=g-import-not-at-top
|
| +
|
| +
|
| +def _DefaultExceptionHandler(cls, e):
|
| + cls.logger.exception(e)
|
| +
|
| +
|
| +def CreateGsutilLogger(command_name):
|
| + """Creates a logger that resembles 'print' output.
|
| +
|
| + This logger abides by gsutil -d/-D/-DD/-q options.
|
| +
|
| + By default (if none of the above options is specified) the logger will display
|
| + all messages logged with level INFO or above. Log propagation is disabled.
|
| +
|
| + Args:
|
| + command_name: Command name to create logger for.
|
| +
|
| + Returns:
|
| + A logger object.
|
| + """
|
| + log = logging.getLogger(command_name)
|
| + log.propagate = False
|
| + log.setLevel(logging.root.level)
|
| + log_handler = GsutilStreamHandler()
|
| + log_handler.setFormatter(logging.Formatter('%(message)s'))
|
| + # Commands that call other commands (like mv) would cause log handlers to be
|
| + # added more than once, so avoid adding if one is already present.
|
| + if not log.handlers:
|
| + log.addHandler(log_handler)
|
| + return log
|
| +
|
| +
|
| +def _UrlArgChecker(command_instance, url):
|
| + if not command_instance.exclude_symlinks:
|
| + return True
|
| + exp_src_url = url.expanded_storage_url
|
| + if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name):
|
| + command_instance.logger.info('Skipping symbolic link %s...', exp_src_url)
|
| + return False
|
| + return True
|
| +
|
| +
|
| +def DummyArgChecker(*unused_args):
|
| + return True
|
| +
|
| +
|
| +def SetAclFuncWrapper(cls, name_expansion_result, thread_state=None):
|
| + return cls.SetAclFunc(name_expansion_result, thread_state=thread_state)
|
| +
|
| +
|
| +def SetAclExceptionHandler(cls, e):
|
| + """Exception handler that maintains state about post-completion status."""
|
| + cls.logger.error(str(e))
|
| + cls.everything_set_okay = False
|
| +
|
| +# We will keep this list of all thread- or process-safe queues ever created by
|
| +# the main thread so that we can forcefully kill them upon shutdown. Otherwise,
|
| +# we encounter a Python bug in which empty queues block forever on join (which
|
| +# is called as part of the Python exit function cleanup) under the impression
|
| +# that they are non-empty.
|
| +# However, this also lets us shut down somewhat more cleanly when interrupted.
|
| +queues = []
|
| +
|
| +
|
| +def _NewMultiprocessingQueue():
|
| + queue = multiprocessing.Queue(MAX_QUEUE_SIZE)
|
| + queues.append(queue)
|
| + return queue
|
| +
|
| +
|
| +def _NewThreadsafeQueue():
|
| + queue = Queue.Queue(MAX_QUEUE_SIZE)
|
| + queues.append(queue)
|
| + return queue
|
| +
|
| +# The maximum size of a process- or thread-safe queue. Imposing this limit
|
| +# prevents us from needing to hold an arbitrary amount of data in memory.
|
| +# However, setting this number too high (e.g., >= 32768 on OS X) can cause
|
| +# problems on some operating systems.
|
| +MAX_QUEUE_SIZE = 32500
|
| +
|
| +# That maximum depth of the tree of recursive calls to command.Apply. This is
|
| +# an arbitrary limit put in place to prevent developers from accidentally
|
| +# causing problems with infinite recursion, and it can be increased if needed.
|
| +MAX_RECURSIVE_DEPTH = 5
|
| +
|
| +ZERO_TASKS_TO_DO_ARGUMENT = ('There were no', 'tasks to do')
|
| +
|
| +# Map from deprecated aliases to the current command and subcommands that
|
| +# provide the same behavior.
|
| +# TODO: Remove this map and deprecate old commands on 9/9/14.
|
| +OLD_ALIAS_MAP = {'chacl': ['acl', 'ch'],
|
| + 'getacl': ['acl', 'get'],
|
| + 'setacl': ['acl', 'set'],
|
| + 'getcors': ['cors', 'get'],
|
| + 'setcors': ['cors', 'set'],
|
| + 'chdefacl': ['defacl', 'ch'],
|
| + 'getdefacl': ['defacl', 'get'],
|
| + 'setdefacl': ['defacl', 'set'],
|
| + 'disablelogging': ['logging', 'set', 'off'],
|
| + 'enablelogging': ['logging', 'set', 'on'],
|
| + 'getlogging': ['logging', 'get'],
|
| + 'getversioning': ['versioning', 'get'],
|
| + 'setversioning': ['versioning', 'set'],
|
| + 'getwebcfg': ['web', 'get'],
|
| + 'setwebcfg': ['web', 'set']}
|
| +
|
| +
|
| +# Declare all of the module level variables - see
|
| +# InitializeMultiprocessingVariables for an explanation of why this is
|
| +# necessary.
|
| +# pylint: disable=global-at-module-level
|
| +global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter
|
| +global total_tasks, call_completed_map, global_return_values_map
|
| +global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed
|
| +global current_max_recursive_level, shared_vars_map, shared_vars_list_map
|
| +global class_map, worker_checking_level_lock, failure_count
|
| +
|
| +
|
| +def InitializeMultiprocessingVariables():
|
| + """Initializes module-level variables that will be inherited by subprocesses.
|
| +
|
| + On Windows, a multiprocessing.Manager object should only
|
| + be created within an "if __name__ == '__main__':" block. This function
|
| + must be called, otherwise every command that calls Command.Apply will fail.
|
| + """
|
| + # This list of global variables must exactly match the above list of
|
| + # declarations.
|
| + # pylint: disable=global-variable-undefined
|
| + global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter
|
| + global total_tasks, call_completed_map, global_return_values_map
|
| + global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed
|
| + global current_max_recursive_level, shared_vars_map, shared_vars_list_map
|
| + global class_map, worker_checking_level_lock, failure_count
|
| +
|
| + manager = multiprocessing.Manager()
|
| +
|
| + consumer_pools = []
|
| +
|
| + # List of all existing task queues - used by all pools to find the queue
|
| + # that's appropriate for the given recursive_apply_level.
|
| + task_queues = []
|
| +
|
| + # Used to assign a globally unique caller ID to each Apply call.
|
| + caller_id_lock = manager.Lock()
|
| + caller_id_counter = multiprocessing.Value('i', 0)
|
| +
|
| + # Map from caller_id to total number of tasks to be completed for that ID.
|
| + total_tasks = ThreadAndProcessSafeDict(manager)
|
| +
|
| + # Map from caller_id to a boolean which is True iff all its tasks are
|
| + # finished.
|
| + call_completed_map = ThreadAndProcessSafeDict(manager)
|
| +
|
| + # Used to keep track of the set of return values for each caller ID.
|
| + global_return_values_map = AtomicIncrementDict(manager)
|
| +
|
| + # Condition used to notify any waiting threads that a task has finished or
|
| + # that a call to Apply needs a new set of consumer processes.
|
| + need_pool_or_done_cond = manager.Condition()
|
| +
|
| + # Lock used to prevent multiple worker processes from asking the main thread
|
| + # to create a new consumer pool for the same level.
|
| + worker_checking_level_lock = manager.Lock()
|
| +
|
| + # Map from caller_id to the current number of completed tasks for that ID.
|
| + caller_id_finished_count = AtomicIncrementDict(manager)
|
| +
|
| + # Used as a way for the main thread to distinguish between being woken up
|
| + # by another call finishing and being woken up by a call that needs a new set
|
| + # of consumer processes.
|
| + new_pool_needed = multiprocessing.Value('i', 0)
|
| +
|
| + current_max_recursive_level = multiprocessing.Value('i', 0)
|
| +
|
| + # Map from (caller_id, name) to the value of that shared variable.
|
| + shared_vars_map = AtomicIncrementDict(manager)
|
| + shared_vars_list_map = ThreadAndProcessSafeDict(manager)
|
| +
|
| + # Map from caller_id to calling class.
|
| + class_map = manager.dict()
|
| +
|
| + # Number of tasks that resulted in an exception in calls to Apply().
|
| + failure_count = multiprocessing.Value('i', 0)
|
| +
|
| +
|
| +# Each subclass of Command must define a property named 'command_spec' that is
|
| +# an instance of the following class.
|
| +CommandSpec = namedtuple('CommandSpec', [
|
| + # Name of command.
|
| + 'command_name',
|
| + # Usage synopsis.
|
| + 'usage_synopsis',
|
| + # List of command name aliases.
|
| + 'command_name_aliases',
|
| + # Min number of args required by this command.
|
| + 'min_args',
|
| + # Max number of args required by this command, or NO_MAX.
|
| + 'max_args',
|
| + # Getopt-style string specifying acceptable sub args.
|
| + 'supported_sub_args',
|
| + # True if file URLs are acceptable for this command.
|
| + 'file_url_ok',
|
| + # True if provider-only URLs are acceptable for this command.
|
| + 'provider_url_ok',
|
| + # Index in args of first URL arg.
|
| + 'urls_start_arg',
|
| + # List of supported APIs
|
| + 'gs_api_support',
|
| + # Default API to use for this command
|
| + 'gs_default_api',
|
| + # Private arguments (for internal testing)
|
| + 'supported_private_args',
|
| + 'argparse_arguments',
|
| +])
|
| +
|
| +
|
| +class Command(HelpProvider):
|
| + """Base class for all gsutil commands."""
|
| +
|
| + # Each subclass must override this with an instance of CommandSpec.
|
| + command_spec = None
|
| +
|
| + _commands_with_subcommands_and_subopts = ['acl', 'defacl', 'logging', 'web',
|
| + 'notification']
|
| +
|
| + # This keeps track of the recursive depth of the current call to Apply.
|
| + recursive_apply_level = 0
|
| +
|
| + # If the multiprocessing module isn't available, we'll use this to keep track
|
| + # of the caller_id.
|
| + sequential_caller_id = -1
|
| +
|
| + @staticmethod
|
| + def CreateCommandSpec(command_name, usage_synopsis=None,
|
| + command_name_aliases=None, min_args=0,
|
| + max_args=NO_MAX, supported_sub_args='',
|
| + file_url_ok=False, provider_url_ok=False,
|
| + urls_start_arg=0, gs_api_support=None,
|
| + gs_default_api=None, supported_private_args=None,
|
| + argparse_arguments=None):
|
| + """Creates an instance of CommandSpec, with defaults."""
|
| + return CommandSpec(
|
| + command_name=command_name,
|
| + usage_synopsis=usage_synopsis,
|
| + command_name_aliases=command_name_aliases or [],
|
| + min_args=min_args,
|
| + max_args=max_args,
|
| + supported_sub_args=supported_sub_args,
|
| + file_url_ok=file_url_ok,
|
| + provider_url_ok=provider_url_ok,
|
| + urls_start_arg=urls_start_arg,
|
| + gs_api_support=gs_api_support or [ApiSelector.XML],
|
| + gs_default_api=gs_default_api or ApiSelector.XML,
|
| + supported_private_args=supported_private_args,
|
| + argparse_arguments=argparse_arguments or [])
|
| +
|
| + # Define a convenience property for command name, since it's used many places.
|
| + def _GetDefaultCommandName(self):
|
| + return self.command_spec.command_name
|
| + command_name = property(_GetDefaultCommandName)
|
| +
|
| + def _CalculateUrlsStartArg(self):
|
| + """Calculate the index in args of the first URL arg.
|
| +
|
| + Returns:
|
| + Index of the first URL arg (according to the command spec).
|
| + """
|
| + return self.command_spec.urls_start_arg
|
| +
|
| + def _TranslateDeprecatedAliases(self, args):
|
| + """Map deprecated aliases to the corresponding new command, and warn."""
|
| + new_command_args = OLD_ALIAS_MAP.get(self.command_alias_used, None)
|
| + if new_command_args:
|
| + # Prepend any subcommands for the new command. The command name itself
|
| + # is not part of the args, so leave it out.
|
| + args = new_command_args[1:] + args
|
| + self.logger.warn('\n'.join(textwrap.wrap(
|
| + ('You are using a deprecated alias, "%(used_alias)s", for the '
|
| + '"%(command_name)s" command. This will stop working on 9/9/2014. '
|
| + 'Please use "%(command_name)s" with the appropriate sub-command in '
|
| + 'the future. See "gsutil help %(command_name)s" for details.') %
|
| + {'used_alias': self.command_alias_used,
|
| + 'command_name': self.command_name})))
|
| + return args
|
| +
|
| + def __init__(self, command_runner, args, headers, debug, parallel_operations,
|
| + bucket_storage_uri_class, gsutil_api_class_map_factory,
|
| + test_method=None, logging_filters=None,
|
| + command_alias_used=None):
|
| + """Instantiates a Command.
|
| +
|
| + Args:
|
| + command_runner: CommandRunner (for commands built atop other commands).
|
| + args: Command-line args (arg0 = actual arg, not command name ala bash).
|
| + headers: Dictionary containing optional HTTP headers to pass to boto.
|
| + debug: Debug level to pass in to boto connection (range 0..3).
|
| + parallel_operations: Should command operations be executed in parallel?
|
| + bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
|
| + Settable for testing/mocking.
|
| + gsutil_api_class_map_factory: Creates map of cloud storage interfaces.
|
| + Settable for testing/mocking.
|
| + test_method: Optional general purpose method for testing purposes.
|
| + Application and semantics of this method will vary by
|
| + command and test type.
|
| + logging_filters: Optional list of logging.Filters to apply to this
|
| + command's logger.
|
| + command_alias_used: The alias that was actually used when running this
|
| + command (as opposed to the "official" command name,
|
| + which will always correspond to the file name).
|
| +
|
| + Implementation note: subclasses shouldn't need to define an __init__
|
| + method, and instead depend on the shared initialization that happens
|
| + here. If you do define an __init__ method in a subclass you'll need to
|
| + explicitly call super().__init__(). But you're encouraged not to do this,
|
| + because it will make changing the __init__ interface more painful.
|
| + """
|
| + # Save class values from constructor params.
|
| + self.command_runner = command_runner
|
| + self.unparsed_args = args
|
| + self.headers = headers
|
| + self.debug = debug
|
| + self.parallel_operations = parallel_operations
|
| + self.bucket_storage_uri_class = bucket_storage_uri_class
|
| + self.gsutil_api_class_map_factory = gsutil_api_class_map_factory
|
| + self.test_method = test_method
|
| + self.exclude_symlinks = False
|
| + self.recursion_requested = False
|
| + self.all_versions = False
|
| + self.command_alias_used = command_alias_used
|
| +
|
| + # Global instance of a threaded logger object.
|
| + self.logger = CreateGsutilLogger(self.command_name)
|
| + if logging_filters:
|
| + for log_filter in logging_filters:
|
| + self.logger.addFilter(log_filter)
|
| +
|
| + if self.command_spec is None:
|
| + raise CommandException('"%s" command implementation is missing a '
|
| + 'command_spec definition.' % self.command_name)
|
| +
|
| + # Parse and validate args.
|
| + self.args = self._TranslateDeprecatedAliases(args)
|
| + self.ParseSubOpts()
|
| +
|
| + # Named tuple public functions start with _
|
| + # pylint: disable=protected-access
|
| + self.command_spec = self.command_spec._replace(
|
| + urls_start_arg=self._CalculateUrlsStartArg())
|
| +
|
| + if (len(self.args) < self.command_spec.min_args
|
| + or len(self.args) > self.command_spec.max_args):
|
| + self.RaiseWrongNumberOfArgumentsException()
|
| +
|
| + if self.command_name not in self._commands_with_subcommands_and_subopts:
|
| + self.CheckArguments()
|
| +
|
| + # Build the support and default maps from the command spec.
|
| + support_map = {
|
| + 'gs': self.command_spec.gs_api_support,
|
| + 's3': [ApiSelector.XML]
|
| + }
|
| + default_map = {
|
| + 'gs': self.command_spec.gs_default_api,
|
| + 's3': ApiSelector.XML
|
| + }
|
| + self.gsutil_api_map = GsutilApiMapFactory.GetApiMap(
|
| + self.gsutil_api_class_map_factory, support_map, default_map)
|
| +
|
| + self.project_id = None
|
| + self.gsutil_api = CloudApiDelegator(
|
| + bucket_storage_uri_class, self.gsutil_api_map,
|
| + self.logger, debug=self.debug)
|
| +
|
| + # Cross-platform path to run gsutil binary.
|
| + self.gsutil_cmd = ''
|
| + # If running on Windows, invoke python interpreter explicitly.
|
| + if gslib.util.IS_WINDOWS:
|
| + self.gsutil_cmd += 'python '
|
| + # Add full path to gsutil to make sure we test the correct version.
|
| + self.gsutil_path = gslib.GSUTIL_PATH
|
| + self.gsutil_cmd += self.gsutil_path
|
| +
|
| + # We're treating recursion_requested like it's used by all commands, but
|
| + # only some of the commands accept the -R option.
|
| + if self.sub_opts:
|
| + for o, unused_a in self.sub_opts:
|
| + if o == '-r' or o == '-R':
|
| + self.recursion_requested = True
|
| + break
|
| +
|
| + self.multiprocessing_is_available = MultiprocessingIsAvailable()[0]
|
| +
|
| + def RaiseWrongNumberOfArgumentsException(self):
|
| + """Raises exception for wrong number of arguments supplied to command."""
|
| + if len(self.args) < self.command_spec.min_args:
|
| + tail_str = 's' if self.command_spec.min_args > 1 else ''
|
| + message = ('The %s command requires at least %d argument%s.' %
|
| + (self.command_name, self.command_spec.min_args, tail_str))
|
| + else:
|
| + message = ('The %s command accepts at most %d arguments.' %
|
| + (self.command_name, self.command_spec.max_args))
|
| + message += ' Usage:\n%s\nFor additional help run:\n gsutil help %s' % (
|
| + self.command_spec.usage_synopsis, self.command_name)
|
| + raise CommandException(message)
|
| +
|
| + def RaiseInvalidArgumentException(self):
|
| + """Raises exception for specifying an invalid argument to command."""
|
| + message = ('Incorrect option(s) specified. Usage:\n%s\n'
|
| + 'For additional help run:\n gsutil help %s' % (
|
| + self.command_spec.usage_synopsis, self.command_name))
|
| + raise CommandException(message)
|
| +
|
| + def ParseSubOpts(self, check_args=False):
|
| + """Parses sub-opt args.
|
| +
|
| + Args:
|
| + check_args: True to have CheckArguments() called after parsing.
|
| +
|
| + Populates:
|
| + (self.sub_opts, self.args) from parsing.
|
| +
|
| + Raises: RaiseInvalidArgumentException if invalid args specified.
|
| + """
|
| + try:
|
| + self.sub_opts, self.args = getopt.getopt(
|
| + self.args, self.command_spec.supported_sub_args,
|
| + self.command_spec.supported_private_args or [])
|
| + except getopt.GetoptError:
|
| + self.RaiseInvalidArgumentException()
|
| + if check_args:
|
| + self.CheckArguments()
|
| +
|
| + def CheckArguments(self):
|
| + """Checks that command line arguments match the command_spec.
|
| +
|
| + Any commands in self._commands_with_subcommands_and_subopts are responsible
|
| + for calling this method after handling initial parsing of their arguments.
|
| + This prevents commands with sub-commands as well as options from breaking
|
| + the parsing of getopt.
|
| +
|
| + TODO: Provide a function to parse commands and sub-commands more
|
| + intelligently once we stop allowing the deprecated command versions.
|
| +
|
| + Raises:
|
| + CommandException if the arguments don't match.
|
| + """
|
| +
|
| + if (not self.command_spec.file_url_ok
|
| + and HaveFileUrls(self.args[self.command_spec.urls_start_arg:])):
|
| + raise CommandException('"%s" command does not support "file://" URLs. '
|
| + 'Did you mean to use a gs:// URL?' %
|
| + self.command_name)
|
| + if (not self.command_spec.provider_url_ok
|
| + and HaveProviderUrls(self.args[self.command_spec.urls_start_arg:])):
|
| + raise CommandException('"%s" command does not support provider-only '
|
| + 'URLs.' % self.command_name)
|
| +
|
| + def WildcardIterator(self, url_string, all_versions=False):
|
| + """Helper to instantiate gslib.WildcardIterator.
|
| +
|
| + Args are same as gslib.WildcardIterator interface, but this method fills in
|
| + most of the values from instance state.
|
| +
|
| + Args:
|
| + url_string: URL string naming wildcard objects to iterate.
|
| + all_versions: If true, the iterator yields all versions of objects
|
| + matching the wildcard. If false, yields just the live
|
| + object version.
|
| +
|
| + Returns:
|
| + WildcardIterator for use by caller.
|
| + """
|
| + return CreateWildcardIterator(
|
| + url_string, self.gsutil_api, all_versions=all_versions,
|
| + debug=self.debug, project_id=self.project_id)
|
| +
|
| + def RunCommand(self):
|
| + """Abstract function in base class. Subclasses must implement this.
|
| +
|
| + The return value of this function will be used as the exit status of the
|
| + process, so subclass commands should return an integer exit code (0 for
|
| + success, a value in [1,255] for failure).
|
| + """
|
| + raise CommandException('Command %s is missing its RunCommand() '
|
| + 'implementation' % self.command_name)
|
| +
|
| + ############################################################
|
| + # Shared helper functions that depend on base class state. #
|
| + ############################################################
|
| +
|
| + def ApplyAclFunc(self, acl_func, acl_excep_handler, url_strs):
|
| + """Sets the standard or default object ACL depending on self.command_name.
|
| +
|
| + Args:
|
| + acl_func: ACL function to be passed to Apply.
|
| + acl_excep_handler: ACL exception handler to be passed to Apply.
|
| + url_strs: URL strings on which to set ACL.
|
| +
|
| + Raises:
|
| + CommandException if an ACL could not be set.
|
| + """
|
| + multi_threaded_url_args = []
|
| + # Handle bucket ACL setting operations single-threaded, because
|
| + # our threading machinery currently assumes it's working with objects
|
| + # (name_expansion_iterator), and normally we wouldn't expect users to need
|
| + # to set ACLs on huge numbers of buckets at once anyway.
|
| + for url_str in url_strs:
|
| + url = StorageUrlFromString(url_str)
|
| + if url.IsCloudUrl() and url.IsBucket():
|
| + if self.recursion_requested:
|
| + # If user specified -R option, convert any bucket args to bucket
|
| + # wildcards (e.g., gs://bucket/*), to prevent the operation from
|
| + # being applied to the buckets themselves.
|
| + url.object_name = '*'
|
| + multi_threaded_url_args.append(url.url_string)
|
| + else:
|
| + # Convert to a NameExpansionResult so we can re-use the threaded
|
| + # function for the single-threaded implementation. RefType is unused.
|
| + for blr in self.WildcardIterator(url.url_string).IterBuckets(
|
| + bucket_fields=['id']):
|
| + name_expansion_for_url = NameExpansionResult(
|
| + url, False, False, blr.storage_url)
|
| + acl_func(self, name_expansion_for_url)
|
| + else:
|
| + multi_threaded_url_args.append(url_str)
|
| +
|
| + if len(multi_threaded_url_args) >= 1:
|
| + name_expansion_iterator = NameExpansionIterator(
|
| + self.command_name, self.debug,
|
| + self.logger, self.gsutil_api,
|
| + multi_threaded_url_args, self.recursion_requested,
|
| + all_versions=self.all_versions,
|
| + continue_on_error=self.continue_on_error or self.parallel_operations)
|
| +
|
| + # Perform requests in parallel (-m) mode, if requested, using
|
| + # configured number of parallel processes and threads. Otherwise,
|
| + # perform requests with sequential function calls in current process.
|
| + self.Apply(acl_func, name_expansion_iterator, acl_excep_handler,
|
| + fail_on_error=not self.continue_on_error)
|
| +
|
| + if not self.everything_set_okay and not self.continue_on_error:
|
| + raise CommandException('ACLs for some objects could not be set.')
|
| +
|
| + def SetAclFunc(self, name_expansion_result, thread_state=None):
|
| + """Sets the object ACL for the name_expansion_result provided.
|
| +
|
| + Args:
|
| + name_expansion_result: NameExpansionResult describing the target object.
|
| + thread_state: If present, use this gsutil Cloud API instance for the set.
|
| + """
|
| + if thread_state:
|
| + assert not self.def_acl
|
| + gsutil_api = thread_state
|
| + else:
|
| + gsutil_api = self.gsutil_api
|
| + op_string = 'default object ACL' if self.def_acl else 'ACL'
|
| + url = name_expansion_result.expanded_storage_url
|
| + self.logger.info('Setting %s on %s...', op_string, url)
|
| + if (gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML
|
| + and url.scheme != 'gs'):
|
| + # If we are called with a non-google ACL model, we need to use the XML
|
| + # passthrough. acl_arg should either be a canned ACL or an XML ACL.
|
| + self._SetAclXmlPassthrough(url, gsutil_api)
|
| + else:
|
| + # Normal Cloud API path. acl_arg is a JSON ACL or a canned ACL.
|
| + self._SetAclGsutilApi(url, gsutil_api)
|
| +
|
| + def _SetAclXmlPassthrough(self, url, gsutil_api):
|
| + """Sets the ACL for the URL provided using the XML passthrough functions.
|
| +
|
| + This function assumes that self.def_acl, self.canned,
|
| + and self.continue_on_error are initialized, and that self.acl_arg is
|
| + either an XML string or a canned ACL string.
|
| +
|
| + Args:
|
| + url: CloudURL to set the ACL on.
|
| + gsutil_api: gsutil Cloud API to use for the ACL set. Must support XML
|
| + passthrough functions.
|
| + """
|
| + try:
|
| + orig_prefer_api = gsutil_api.prefer_api
|
| + gsutil_api.prefer_api = ApiSelector.XML
|
| + gsutil_api.XmlPassThroughSetAcl(
|
| + self.acl_arg, url, canned=self.canned,
|
| + def_obj_acl=self.def_acl, provider=url.scheme)
|
| + except ServiceException as e:
|
| + if self.continue_on_error:
|
| + self.everything_set_okay = False
|
| + self.logger.error(e)
|
| + else:
|
| + raise
|
| + finally:
|
| + gsutil_api.prefer_api = orig_prefer_api
|
| +
|
| + def _SetAclGsutilApi(self, url, gsutil_api):
|
| + """Sets the ACL for the URL provided using the gsutil Cloud API.
|
| +
|
| + This function assumes that self.def_acl, self.canned,
|
| + and self.continue_on_error are initialized, and that self.acl_arg is
|
| + either a JSON string or a canned ACL string.
|
| +
|
| + Args:
|
| + url: CloudURL to set the ACL on.
|
| + gsutil_api: gsutil Cloud API to use for the ACL set.
|
| + """
|
| + try:
|
| + if url.IsBucket():
|
| + if self.def_acl:
|
| + if self.canned:
|
| + gsutil_api.PatchBucket(
|
| + url.bucket_name, apitools_messages.Bucket(),
|
| + canned_def_acl=self.acl_arg, provider=url.scheme, fields=['id'])
|
| + else:
|
| + def_obj_acl = AclTranslation.JsonToMessage(
|
| + self.acl_arg, apitools_messages.ObjectAccessControl)
|
| + bucket_metadata = apitools_messages.Bucket(
|
| + defaultObjectAcl=def_obj_acl)
|
| + gsutil_api.PatchBucket(url.bucket_name, bucket_metadata,
|
| + provider=url.scheme, fields=['id'])
|
| + else:
|
| + if self.canned:
|
| + gsutil_api.PatchBucket(
|
| + url.bucket_name, apitools_messages.Bucket(),
|
| + canned_acl=self.acl_arg, provider=url.scheme, fields=['id'])
|
| + else:
|
| + bucket_acl = AclTranslation.JsonToMessage(
|
| + self.acl_arg, apitools_messages.BucketAccessControl)
|
| + bucket_metadata = apitools_messages.Bucket(acl=bucket_acl)
|
| + gsutil_api.PatchBucket(url.bucket_name, bucket_metadata,
|
| + provider=url.scheme, fields=['id'])
|
| + else: # url.IsObject()
|
| + if self.canned:
|
| + gsutil_api.PatchObjectMetadata(
|
| + url.bucket_name, url.object_name, apitools_messages.Object(),
|
| + provider=url.scheme, generation=url.generation,
|
| + canned_acl=self.acl_arg)
|
| + else:
|
| + object_acl = AclTranslation.JsonToMessage(
|
| + self.acl_arg, apitools_messages.ObjectAccessControl)
|
| + object_metadata = apitools_messages.Object(acl=object_acl)
|
| + gsutil_api.PatchObjectMetadata(url.bucket_name, url.object_name,
|
| + object_metadata, provider=url.scheme,
|
| + generation=url.generation)
|
| + except ArgumentException, e:
|
| + raise
|
| + except ServiceException, e:
|
| + if self.continue_on_error:
|
| + self.everything_set_okay = False
|
| + self.logger.error(e)
|
| + else:
|
| + raise
|
| +
|
| + def SetAclCommandHelper(self, acl_func, acl_excep_handler):
|
| + """Sets ACLs on the self.args using the passed-in acl function.
|
| +
|
| + Args:
|
| + acl_func: ACL function to be passed to Apply.
|
| + acl_excep_handler: ACL exception handler to be passed to Apply.
|
| + """
|
| + acl_arg = self.args[0]
|
| + url_args = self.args[1:]
|
| + # Disallow multi-provider setacl requests, because there are differences in
|
| + # the ACL models.
|
| + if not UrlsAreForSingleProvider(url_args):
|
| + raise CommandException('"%s" command spanning providers not allowed.' %
|
| + self.command_name)
|
| +
|
| + # Determine whether acl_arg names a file containing XML ACL text vs. the
|
| + # string name of a canned ACL.
|
| + if os.path.isfile(acl_arg):
|
| + with codecs.open(acl_arg, 'r', UTF8) as f:
|
| + acl_arg = f.read()
|
| + self.canned = False
|
| + else:
|
| + # No file exists, so expect a canned ACL string.
|
| + # Canned ACLs are not supported in JSON and we need to use the XML API
|
| + # to set them.
|
| + # validate=False because we allow wildcard urls.
|
| + storage_uri = boto.storage_uri(
|
| + url_args[0], debug=self.debug, validate=False,
|
| + bucket_storage_uri_class=self.bucket_storage_uri_class)
|
| +
|
| + canned_acls = storage_uri.canned_acls()
|
| + if acl_arg not in canned_acls:
|
| + raise CommandException('Invalid canned ACL "%s".' % acl_arg)
|
| + self.canned = True
|
| +
|
| + # Used to track if any ACLs failed to be set.
|
| + self.everything_set_okay = True
|
| + self.acl_arg = acl_arg
|
| +
|
| + self.ApplyAclFunc(acl_func, acl_excep_handler, url_args)
|
| + if not self.everything_set_okay and not self.continue_on_error:
|
| + raise CommandException('ACLs for some objects could not be set.')
|
| +
|
| + def _WarnServiceAccounts(self):
|
| + """Warns service account users who have received an AccessDenied error.
|
| +
|
| + When one of the metadata-related commands fails due to AccessDenied, user
|
| + must ensure that they are listed as an Owner in the API console.
|
| + """
|
| + # Import this here so that the value will be set first in
|
| + # gcs_oauth2_boto_plugin.
|
| + # pylint: disable=g-import-not-at-top
|
| + from gcs_oauth2_boto_plugin.oauth2_plugin import IS_SERVICE_ACCOUNT
|
| +
|
| + if IS_SERVICE_ACCOUNT:
|
| + # This method is only called when canned ACLs are used, so the warning
|
| + # definitely applies.
|
| + self.logger.warning('\n'.join(textwrap.wrap(
|
| + 'It appears that your service account has been denied access while '
|
| + 'attempting to perform a metadata operation. If you believe that you '
|
| + 'should have access to this metadata (i.e., if it is associated with '
|
| + 'your account), please make sure that your service account''s email '
|
| + 'address is listed as an Owner in the Team tab of the API console. '
|
| + 'See "gsutil help creds" for further information.\n')))
|
| +
|
| + def GetAndPrintAcl(self, url_str):
|
| + """Prints the standard or default object ACL depending on self.command_name.
|
| +
|
| + Args:
|
| + url_str: URL string to get ACL for.
|
| + """
|
| + blr = self.GetAclCommandBucketListingReference(url_str)
|
| + url = StorageUrlFromString(url_str)
|
| + if (self.gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML
|
| + and url.scheme != 'gs'):
|
| + # Need to use XML passthrough.
|
| + try:
|
| + acl = self.gsutil_api.XmlPassThroughGetAcl(
|
| + url, def_obj_acl=self.def_acl, provider=url.scheme)
|
| + print acl.to_xml()
|
| + except AccessDeniedException, _:
|
| + self._WarnServiceAccounts()
|
| + raise
|
| + else:
|
| + if self.command_name == 'defacl':
|
| + acl = blr.root_object.defaultObjectAcl
|
| + if not acl:
|
| + self.logger.warn(
|
| + 'No default object ACL present for %s. This could occur if '
|
| + 'the default object ACL is private, in which case objects '
|
| + 'created in this bucket will be readable only by their '
|
| + 'creators. It could also mean you do not have OWNER permission '
|
| + 'on %s and therefore do not have permission to read the '
|
| + 'default object ACL.', url_str, url_str)
|
| + else:
|
| + acl = blr.root_object.acl
|
| + if not acl:
|
| + self._WarnServiceAccounts()
|
| + raise AccessDeniedException('Access denied. Please ensure you have '
|
| + 'OWNER permission on %s.' % url_str)
|
| + print AclTranslation.JsonFromMessage(acl)
|
| +
|
| + def GetAclCommandBucketListingReference(self, url_str):
|
| + """Gets a single bucket listing reference for an acl get command.
|
| +
|
| + Args:
|
| + url_str: URL string to get the bucket listing reference for.
|
| +
|
| + Returns:
|
| + BucketListingReference for the URL string.
|
| +
|
| + Raises:
|
| + CommandException if string did not result in exactly one reference.
|
| + """
|
| + # We're guaranteed by caller that we have the appropriate type of url
|
| + # string for the call (ex. we will never be called with an object string
|
| + # by getdefacl)
|
| + wildcard_url = StorageUrlFromString(url_str)
|
| + if wildcard_url.IsObject():
|
| + plurality_iter = PluralityCheckableIterator(
|
| + self.WildcardIterator(url_str).IterObjects(
|
| + bucket_listing_fields=['acl']))
|
| + else:
|
| + # Bucket or provider. We call IterBuckets explicitly here to ensure that
|
| + # the root object is populated with the acl.
|
| + if self.command_name == 'defacl':
|
| + bucket_fields = ['defaultObjectAcl']
|
| + else:
|
| + bucket_fields = ['acl']
|
| + plurality_iter = PluralityCheckableIterator(
|
| + self.WildcardIterator(url_str).IterBuckets(
|
| + bucket_fields=bucket_fields))
|
| + if plurality_iter.IsEmpty():
|
| + raise CommandException('No URLs matched')
|
| + if plurality_iter.HasPlurality():
|
| + raise CommandException(
|
| + '%s matched more than one URL, which is not allowed by the %s '
|
| + 'command' % (url_str, self.command_name))
|
| + return list(plurality_iter)[0]
|
| +
|
| + def _HandleMultiProcessingSigs(self, unused_signal_num,
|
| + unused_cur_stack_frame):
|
| + """Handles signals INT AND TERM during a multi-process/multi-thread request.
|
| +
|
| + Kills subprocesses.
|
| +
|
| + Args:
|
| + unused_signal_num: signal generated by ^C.
|
| + unused_cur_stack_frame: Current stack frame.
|
| + """
|
| + # Note: This only works under Linux/MacOS. See
|
| + # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details
|
| + # about why making it work correctly across OS's is harder and still open.
|
| + ShutDownGsutil()
|
| + sys.stderr.write('Caught ^C - exiting\n')
|
| + # Simply calling sys.exit(1) doesn't work - see above bug for details.
|
| + KillProcess(os.getpid())
|
| +
|
| + def GetSingleBucketUrlFromArg(self, arg, bucket_fields=None):
|
| + """Gets a single bucket URL based on the command arguments.
|
| +
|
| + Args:
|
| + arg: String argument to get bucket URL for.
|
| + bucket_fields: Fields to populate for the bucket.
|
| +
|
| + Returns:
|
| + (StorageUrl referring to a single bucket, Bucket metadata).
|
| +
|
| + Raises:
|
| + CommandException if args did not match exactly one bucket.
|
| + """
|
| + plurality_checkable_iterator = self.GetBucketUrlIterFromArg(
|
| + arg, bucket_fields=bucket_fields)
|
| + if plurality_checkable_iterator.HasPlurality():
|
| + raise CommandException(
|
| + '%s matched more than one URL, which is not\n'
|
| + 'allowed by the %s command' % (arg, self.command_name))
|
| + blr = list(plurality_checkable_iterator)[0]
|
| + return StorageUrlFromString(blr.url_string), blr.root_object
|
| +
|
| + def GetBucketUrlIterFromArg(self, arg, bucket_fields=None):
|
| + """Gets a single bucket URL based on the command arguments.
|
| +
|
| + Args:
|
| + arg: String argument to iterate over.
|
| + bucket_fields: Fields to populate for the bucket.
|
| +
|
| + Returns:
|
| + PluralityCheckableIterator over buckets.
|
| +
|
| + Raises:
|
| + CommandException if iterator matched no buckets.
|
| + """
|
| + arg_url = StorageUrlFromString(arg)
|
| + if not arg_url.IsCloudUrl() or arg_url.IsObject():
|
| + raise CommandException('"%s" command must specify a bucket' %
|
| + self.command_name)
|
| +
|
| + plurality_checkable_iterator = PluralityCheckableIterator(
|
| + self.WildcardIterator(arg).IterBuckets(
|
| + bucket_fields=bucket_fields))
|
| + if plurality_checkable_iterator.IsEmpty():
|
| + raise CommandException('No URLs matched')
|
| + return plurality_checkable_iterator
|
| +
|
| + ######################
|
| + # Private functions. #
|
| + ######################
|
| +
|
| + def _ResetConnectionPool(self):
|
| + # Each OS process needs to establish its own set of connections to
|
| + # the server to avoid writes from different OS processes interleaving
|
| + # onto the same socket (and garbling the underlying SSL session).
|
| + # We ensure each process gets its own set of connections here by
|
| + # closing all connections in the storage provider connection pool.
|
| + connection_pool = StorageUri.provider_pool
|
| + if connection_pool:
|
| + for i in connection_pool:
|
| + connection_pool[i].connection.close()
|
| +
|
| + def _GetProcessAndThreadCount(self, process_count, thread_count,
|
| + parallel_operations_override):
|
| + """Determines the values of process_count and thread_count.
|
| +
|
| + These values are used for parallel operations.
|
| + If we're not performing operations in parallel, then ignore
|
| + existing values and use process_count = thread_count = 1.
|
| +
|
| + Args:
|
| + process_count: A positive integer or None. In the latter case, we read
|
| + the value from the .boto config file.
|
| + thread_count: A positive integer or None. In the latter case, we read
|
| + the value from the .boto config file.
|
| + parallel_operations_override: Used to override self.parallel_operations.
|
| + This allows the caller to safely override
|
| + the top-level flag for a single call.
|
| +
|
| + Returns:
|
| + (process_count, thread_count): The number of processes and threads to use,
|
| + respectively.
|
| + """
|
| + # Set OS process and python thread count as a function of options
|
| + # and config.
|
| + if self.parallel_operations or parallel_operations_override:
|
| + if not process_count:
|
| + process_count = boto.config.getint(
|
| + 'GSUtil', 'parallel_process_count',
|
| + gslib.commands.config.DEFAULT_PARALLEL_PROCESS_COUNT)
|
| + if process_count < 1:
|
| + raise CommandException('Invalid parallel_process_count "%d".' %
|
| + process_count)
|
| + if not thread_count:
|
| + thread_count = boto.config.getint(
|
| + 'GSUtil', 'parallel_thread_count',
|
| + gslib.commands.config.DEFAULT_PARALLEL_THREAD_COUNT)
|
| + if thread_count < 1:
|
| + raise CommandException('Invalid parallel_thread_count "%d".' %
|
| + thread_count)
|
| + else:
|
| + # If -m not specified, then assume 1 OS process and 1 Python thread.
|
| + process_count = 1
|
| + thread_count = 1
|
| +
|
| + if IS_WINDOWS and process_count > 1:
|
| + raise CommandException('\n'.join(textwrap.wrap(
|
| + ('It is not possible to set process_count > 1 on Windows. Please '
|
| + 'update your config file (located at %s) and set '
|
| + '"parallel_process_count = 1".') %
|
| + GetConfigFilePath())))
|
| + self.logger.debug('process count: %d', process_count)
|
| + self.logger.debug('thread count: %d', thread_count)
|
| +
|
| + return (process_count, thread_count)
|
| +
|
| + def _SetUpPerCallerState(self):
|
| + """Set up the state for a caller id, corresponding to one Apply call."""
|
| + # Get a new caller ID.
|
| + with caller_id_lock:
|
| + caller_id_counter.value += 1
|
| + caller_id = caller_id_counter.value
|
| +
|
| + # Create a copy of self with an incremented recursive level. This allows
|
| + # the class to report its level correctly if the function called from it
|
| + # also needs to call Apply.
|
| + cls = copy.copy(self)
|
| + cls.recursive_apply_level += 1
|
| +
|
| + # Thread-safe loggers can't be pickled, so we will remove it here and
|
| + # recreate it later in the WorkerThread. This is not a problem since any
|
| + # logger with the same name will be treated as a singleton.
|
| + cls.logger = None
|
| +
|
| + # Likewise, the default API connection can't be pickled, but it is unused
|
| + # anyway as each thread gets its own API delegator.
|
| + cls.gsutil_api = None
|
| +
|
| + class_map[caller_id] = cls
|
| + total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet.
|
| + call_completed_map[caller_id] = False
|
| + caller_id_finished_count.Put(caller_id, 0)
|
| + global_return_values_map.Put(caller_id, [])
|
| + return caller_id
|
| +
|
| + def _CreateNewConsumerPool(self, num_processes, num_threads):
|
| + """Create a new pool of processes that call _ApplyThreads."""
|
| + processes = []
|
| + task_queue = _NewMultiprocessingQueue()
|
| + task_queues.append(task_queue)
|
| +
|
| + current_max_recursive_level.value += 1
|
| + if current_max_recursive_level.value > MAX_RECURSIVE_DEPTH:
|
| + raise CommandException('Recursion depth of Apply calls is too great.')
|
| + for _ in range(num_processes):
|
| + recursive_apply_level = len(consumer_pools)
|
| + p = multiprocessing.Process(
|
| + target=self._ApplyThreads,
|
| + args=(num_threads, num_processes, recursive_apply_level))
|
| + p.daemon = True
|
| + processes.append(p)
|
| + p.start()
|
| + consumer_pool = _ConsumerPool(processes, task_queue)
|
| + consumer_pools.append(consumer_pool)
|
| +
|
| + def Apply(self, func, args_iterator, exception_handler,
|
| + shared_attrs=None, arg_checker=_UrlArgChecker,
|
| + parallel_operations_override=False, process_count=None,
|
| + thread_count=None, should_return_results=False,
|
| + fail_on_error=False):
|
| + """Calls _Parallel/SequentialApply based on multiprocessing availability.
|
| +
|
| + Args:
|
| + func: Function to call to process each argument.
|
| + args_iterator: Iterable collection of arguments to be put into the
|
| + work queue.
|
| + exception_handler: Exception handler for WorkerThread class.
|
| + shared_attrs: List of attributes to manage across sub-processes.
|
| + arg_checker: Used to determine whether we should process the current
|
| + argument or simply skip it. Also handles any logging that
|
| + is specific to a particular type of argument.
|
| + parallel_operations_override: Used to override self.parallel_operations.
|
| + This allows the caller to safely override
|
| + the top-level flag for a single call.
|
| + process_count: The number of processes to use. If not specified, then
|
| + the configured default will be used.
|
| + thread_count: The number of threads per process. If not speficied, then
|
| + the configured default will be used..
|
| + should_return_results: If true, then return the results of all successful
|
| + calls to func in a list.
|
| + fail_on_error: If true, then raise any exceptions encountered when
|
| + executing func. This is only applicable in the case of
|
| + process_count == thread_count == 1.
|
| +
|
| + Returns:
|
| + Results from spawned threads.
|
| + """
|
| + if shared_attrs:
|
| + original_shared_vars_values = {} # We'll add these back in at the end.
|
| + for name in shared_attrs:
|
| + original_shared_vars_values[name] = getattr(self, name)
|
| + # By setting this to 0, we simplify the logic for computing deltas.
|
| + # We'll add it back after all of the tasks have been performed.
|
| + setattr(self, name, 0)
|
| +
|
| + (process_count, thread_count) = self._GetProcessAndThreadCount(
|
| + process_count, thread_count, parallel_operations_override)
|
| +
|
| + is_main_thread = (self.recursive_apply_level == 0
|
| + and self.sequential_caller_id == -1)
|
| +
|
| + # We don't honor the fail_on_error flag in the case of multiple threads
|
| + # or processes.
|
| + fail_on_error = fail_on_error and (process_count * thread_count == 1)
|
| +
|
| + # Only check this from the first call in the main thread. Apart from the
|
| + # fact that it's wasteful to try this multiple times in general, it also
|
| + # will never work when called from a subprocess since we use daemon
|
| + # processes, and daemons can't create other processes.
|
| + if is_main_thread:
|
| + if ((not self.multiprocessing_is_available)
|
| + and thread_count * process_count > 1):
|
| + # Run the check again and log the appropriate warnings. This was run
|
| + # before, when the Command object was created, in order to calculate
|
| + # self.multiprocessing_is_available, but we don't want to print the
|
| + # warning until we're sure the user actually tried to use multiple
|
| + # threads or processes.
|
| + MultiprocessingIsAvailable(logger=self.logger)
|
| +
|
| + if self.multiprocessing_is_available:
|
| + caller_id = self._SetUpPerCallerState()
|
| + else:
|
| + self.sequential_caller_id += 1
|
| + caller_id = self.sequential_caller_id
|
| +
|
| + if is_main_thread:
|
| + # pylint: disable=global-variable-undefined
|
| + global global_return_values_map, shared_vars_map, failure_count
|
| + global caller_id_finished_count, shared_vars_list_map
|
| + global_return_values_map = BasicIncrementDict()
|
| + global_return_values_map.Put(caller_id, [])
|
| + shared_vars_map = BasicIncrementDict()
|
| + caller_id_finished_count = BasicIncrementDict()
|
| + shared_vars_list_map = {}
|
| + failure_count = 0
|
| +
|
| + # If any shared attributes passed by caller, create a dictionary of
|
| + # shared memory variables for every element in the list of shared
|
| + # attributes.
|
| + if shared_attrs:
|
| + shared_vars_list_map[caller_id] = shared_attrs
|
| + for name in shared_attrs:
|
| + shared_vars_map.Put((caller_id, name), 0)
|
| +
|
| + # Make all of the requested function calls.
|
| + if self.multiprocessing_is_available and thread_count * process_count > 1:
|
| + self._ParallelApply(func, args_iterator, exception_handler, caller_id,
|
| + arg_checker, process_count, thread_count,
|
| + should_return_results, fail_on_error)
|
| + else:
|
| + self._SequentialApply(func, args_iterator, exception_handler, caller_id,
|
| + arg_checker, should_return_results, fail_on_error)
|
| +
|
| + if shared_attrs:
|
| + for name in shared_attrs:
|
| + # This allows us to retain the original value of the shared variable,
|
| + # and simply apply the delta after what was done during the call to
|
| + # apply.
|
| + final_value = (original_shared_vars_values[name] +
|
| + shared_vars_map.Get((caller_id, name)))
|
| + setattr(self, name, final_value)
|
| +
|
| + if should_return_results:
|
| + return global_return_values_map.Get(caller_id)
|
| +
|
| + def _MaybeSuggestGsutilDashM(self):
|
| + """Outputs a sugestion to the user to use gsutil -m."""
|
| + if not (boto.config.getint('GSUtil', 'parallel_process_count', 0) == 1 and
|
| + boto.config.getint('GSUtil', 'parallel_thread_count', 0) == 1):
|
| + self.logger.info('\n' + textwrap.fill(
|
| + '==> NOTE: You are performing a sequence of gsutil operations that '
|
| + 'may run significantly faster if you instead use gsutil -m %s ...\n'
|
| + 'Please see the -m section under "gsutil help options" for further '
|
| + 'information about when gsutil -m can be advantageous.'
|
| + % sys.argv[1]) + '\n')
|
| +
|
| + # pylint: disable=g-doc-args
|
| + def _SequentialApply(self, func, args_iterator, exception_handler, caller_id,
|
| + arg_checker, should_return_results, fail_on_error):
|
| + """Performs all function calls sequentially in the current thread.
|
| +
|
| + No other threads or processes will be spawned. This degraded functionality
|
| + is used when the multiprocessing module is not available or the user
|
| + requests only one thread and one process.
|
| + """
|
| + # Create a WorkerThread to handle all of the logic needed to actually call
|
| + # the function. Note that this thread will never be started, and all work
|
| + # is done in the current thread.
|
| + worker_thread = WorkerThread(None, False)
|
| + args_iterator = iter(args_iterator)
|
| + # Count of sequential calls that have been made. Used for producing
|
| + # suggestion to use gsutil -m.
|
| + sequential_call_count = 0
|
| + while True:
|
| +
|
| + # Try to get the next argument, handling any exceptions that arise.
|
| + try:
|
| + args = args_iterator.next()
|
| + except StopIteration, e:
|
| + break
|
| + except Exception, e: # pylint: disable=broad-except
|
| + _IncrementFailureCount()
|
| + if fail_on_error:
|
| + raise
|
| + else:
|
| + try:
|
| + exception_handler(self, e)
|
| + except Exception, _: # pylint: disable=broad-except
|
| + self.logger.debug(
|
| + 'Caught exception while handling exception for %s:\n%s',
|
| + func, traceback.format_exc())
|
| + continue
|
| +
|
| + sequential_call_count += 1
|
| + if sequential_call_count == OFFER_GSUTIL_M_SUGGESTION_THRESHOLD:
|
| + # Output suggestion near beginning of run, so user sees it early and can
|
| + # ^C and try gsutil -m.
|
| + self._MaybeSuggestGsutilDashM()
|
| + if arg_checker(self, args):
|
| + # Now that we actually have the next argument, perform the task.
|
| + task = Task(func, args, caller_id, exception_handler,
|
| + should_return_results, arg_checker, fail_on_error)
|
| + worker_thread.PerformTask(task, self)
|
| + if sequential_call_count >= gslib.util.GetTermLines():
|
| + # Output suggestion at end of long run, in case user missed it at the
|
| + # start and it scrolled off-screen.
|
| + self._MaybeSuggestGsutilDashM()
|
| +
|
| + # pylint: disable=g-doc-args
|
| + def _ParallelApply(self, func, args_iterator, exception_handler, caller_id,
|
| + arg_checker, process_count, thread_count,
|
| + should_return_results, fail_on_error):
|
| + """Dispatches input arguments across a thread/process pool.
|
| +
|
| + Pools are composed of parallel OS processes and/or Python threads,
|
| + based on options (-m or not) and settings in the user's config file.
|
| +
|
| + If only one OS process is requested/available, dispatch requests across
|
| + threads in the current OS process.
|
| +
|
| + In the multi-process case, we will create one pool of worker processes for
|
| + each level of the tree of recursive calls to Apply. E.g., if A calls
|
| + Apply(B), and B ultimately calls Apply(C) followed by Apply(D), then we
|
| + will only create two sets of worker processes - B will execute in the first,
|
| + and C and D will execute in the second. If C is then changed to call
|
| + Apply(E) and D is changed to call Apply(F), then we will automatically
|
| + create a third set of processes (lazily, when needed) that will be used to
|
| + execute calls to E and F. This might look something like:
|
| +
|
| + Pool1 Executes: B
|
| + / \
|
| + Pool2 Executes: C D
|
| + / \
|
| + Pool3 Executes: E F
|
| +
|
| + Apply's parallelism is generally broken up into 4 cases:
|
| + - If process_count == thread_count == 1, then all tasks will be executed
|
| + by _SequentialApply.
|
| + - If process_count > 1 and thread_count == 1, then the main thread will
|
| + create a new pool of processes (if they don't already exist) and each of
|
| + those processes will execute the tasks in a single thread.
|
| + - If process_count == 1 and thread_count > 1, then this process will create
|
| + a new pool of threads to execute the tasks.
|
| + - If process_count > 1 and thread_count > 1, then the main thread will
|
| + create a new pool of processes (if they don't already exist) and each of
|
| + those processes will, upon creation, create a pool of threads to
|
| + execute the tasks.
|
| +
|
| + Args:
|
| + caller_id: The caller ID unique to this call to command.Apply.
|
| + See command.Apply for description of other arguments.
|
| + """
|
| + is_main_thread = self.recursive_apply_level == 0
|
| +
|
| + # Catch SIGINT and SIGTERM under Linux/MacOs so we can do cleanup before
|
| + # exiting.
|
| + if not IS_WINDOWS and is_main_thread:
|
| + # Register as a final signal handler because this handler kills the
|
| + # main gsutil process (so it must run last).
|
| + RegisterSignalHandler(signal.SIGINT, self._HandleMultiProcessingSigs,
|
| + is_final_handler=True)
|
| + RegisterSignalHandler(signal.SIGTERM, self._HandleMultiProcessingSigs,
|
| + is_final_handler=True)
|
| +
|
| + if not task_queues:
|
| + # The process we create will need to access the next recursive level
|
| + # of task queues if it makes a call to Apply, so we always keep around
|
| + # one more queue than we know we need. OTOH, if we don't create a new
|
| + # process, the existing process still needs a task queue to use.
|
| + task_queues.append(_NewMultiprocessingQueue())
|
| +
|
| + if process_count > 1: # Handle process pool creation.
|
| + # Check whether this call will need a new set of workers.
|
| +
|
| + # Each worker must acquire a shared lock before notifying the main thread
|
| + # that it needs a new worker pool, so that at most one worker asks for
|
| + # a new worker pool at once.
|
| + try:
|
| + if not is_main_thread:
|
| + worker_checking_level_lock.acquire()
|
| + if self.recursive_apply_level >= current_max_recursive_level.value:
|
| + with need_pool_or_done_cond:
|
| + # Only the main thread is allowed to create new processes -
|
| + # otherwise, we will run into some Python bugs.
|
| + if is_main_thread:
|
| + self._CreateNewConsumerPool(process_count, thread_count)
|
| + else:
|
| + # Notify the main thread that we need a new consumer pool.
|
| + new_pool_needed.value = 1
|
| + need_pool_or_done_cond.notify_all()
|
| + # The main thread will notify us when it finishes.
|
| + need_pool_or_done_cond.wait()
|
| + finally:
|
| + if not is_main_thread:
|
| + worker_checking_level_lock.release()
|
| +
|
| + # If we're running in this process, create a separate task queue. Otherwise,
|
| + # if Apply has already been called with process_count > 1, then there will
|
| + # be consumer pools trying to use our processes.
|
| + if process_count > 1:
|
| + task_queue = task_queues[self.recursive_apply_level]
|
| + else:
|
| + task_queue = _NewMultiprocessingQueue()
|
| +
|
| + # Kick off a producer thread to throw tasks in the global task queue. We
|
| + # do this asynchronously so that the main thread can be free to create new
|
| + # consumer pools when needed (otherwise, any thread with a task that needs
|
| + # a new consumer pool must block until we're completely done producing; in
|
| + # the worst case, every worker blocks on such a call and the producer fills
|
| + # up the task queue before it finishes, so we block forever).
|
| + producer_thread = ProducerThread(copy.copy(self), args_iterator, caller_id,
|
| + func, task_queue, should_return_results,
|
| + exception_handler, arg_checker,
|
| + fail_on_error)
|
| +
|
| + if process_count > 1:
|
| + # Wait here until either:
|
| + # 1. We're the main thread and someone needs a new consumer pool - in
|
| + # which case we create one and continue waiting.
|
| + # 2. Someone notifies us that all of the work we requested is done, in
|
| + # which case we retrieve the results (if applicable) and stop
|
| + # waiting.
|
| + while True:
|
| + with need_pool_or_done_cond:
|
| + # Either our call is done, or someone needs a new level of consumer
|
| + # pools, or we the wakeup call was meant for someone else. It's
|
| + # impossible for both conditions to be true, since the main thread is
|
| + # blocked on any other ongoing calls to Apply, and a thread would not
|
| + # ask for a new consumer pool unless it had more work to do.
|
| + if call_completed_map[caller_id]:
|
| + break
|
| + elif is_main_thread and new_pool_needed.value:
|
| + new_pool_needed.value = 0
|
| + self._CreateNewConsumerPool(process_count, thread_count)
|
| + need_pool_or_done_cond.notify_all()
|
| +
|
| + # Note that we must check the above conditions before the wait() call;
|
| + # otherwise, the notification can happen before we start waiting, in
|
| + # which case we'll block forever.
|
| + need_pool_or_done_cond.wait()
|
| + else: # Using a single process.
|
| + self._ApplyThreads(thread_count, process_count,
|
| + self.recursive_apply_level,
|
| + is_blocking_call=True, task_queue=task_queue)
|
| +
|
| + # We encountered an exception from the producer thread before any arguments
|
| + # were enqueued, but it wouldn't have been propagated, so we'll now
|
| + # explicitly raise it here.
|
| + if producer_thread.unknown_exception:
|
| + # pylint: disable=raising-bad-type
|
| + raise producer_thread.unknown_exception
|
| +
|
| + # We encountered an exception from the producer thread while iterating over
|
| + # the arguments, so raise it here if we're meant to fail on error.
|
| + if producer_thread.iterator_exception and fail_on_error:
|
| + # pylint: disable=raising-bad-type
|
| + raise producer_thread.iterator_exception
|
| +
|
| + def _ApplyThreads(self, thread_count, process_count, recursive_apply_level,
|
| + is_blocking_call=False, task_queue=None):
|
| + """Assigns the work from the multi-process global task queue.
|
| +
|
| + Work is assigned to an individual process for later consumption either by
|
| + the WorkerThreads or (if thread_count == 1) this thread.
|
| +
|
| + Args:
|
| + thread_count: The number of threads used to perform the work. If 1, then
|
| + perform all work in this thread.
|
| + process_count: The number of processes used to perform the work.
|
| + recursive_apply_level: The depth in the tree of recursive calls to Apply
|
| + of this thread.
|
| + is_blocking_call: True iff the call to Apply is blocked on this call
|
| + (which is true iff process_count == 1), implying that
|
| + _ApplyThreads must behave as a blocking call.
|
| + """
|
| + self._ResetConnectionPool()
|
| + self.recursive_apply_level = recursive_apply_level
|
| +
|
| + task_queue = task_queue or task_queues[recursive_apply_level]
|
| +
|
| + assert thread_count * process_count > 1, (
|
| + 'Invalid state, calling command._ApplyThreads with only one thread '
|
| + 'and process.')
|
| + worker_pool = WorkerPool(
|
| + thread_count, self.logger,
|
| + bucket_storage_uri_class=self.bucket_storage_uri_class,
|
| + gsutil_api_map=self.gsutil_api_map, debug=self.debug)
|
| +
|
| + num_enqueued = 0
|
| + while True:
|
| + task = task_queue.get()
|
| + if task.args != ZERO_TASKS_TO_DO_ARGUMENT:
|
| + # If we have no tasks to do and we're performing a blocking call, we
|
| + # need a special signal to tell us to stop - otherwise, we block on
|
| + # the call to task_queue.get() forever.
|
| + worker_pool.AddTask(task)
|
| + num_enqueued += 1
|
| +
|
| + if is_blocking_call:
|
| + num_to_do = total_tasks[task.caller_id]
|
| + # The producer thread won't enqueue the last task until after it has
|
| + # updated total_tasks[caller_id], so we know that num_to_do < 0 implies
|
| + # we will do this check again.
|
| + if num_to_do >= 0 and num_enqueued == num_to_do:
|
| + if thread_count == 1:
|
| + return
|
| + else:
|
| + while True:
|
| + with need_pool_or_done_cond:
|
| + if call_completed_map[task.caller_id]:
|
| + # We need to check this first, in case the condition was
|
| + # notified before we grabbed the lock.
|
| + return
|
| + need_pool_or_done_cond.wait()
|
| +
|
| +
|
| +# Below here lie classes and functions related to controlling the flow of tasks
|
| +# between various threads and processes.
|
| +
|
| +
|
| +class _ConsumerPool(object):
|
| +
|
| + def __init__(self, processes, task_queue):
|
| + self.processes = processes
|
| + self.task_queue = task_queue
|
| +
|
| + def ShutDown(self):
|
| + for process in self.processes:
|
| + KillProcess(process.pid)
|
| +
|
| +
|
| +def KillProcess(pid):
|
| + """Make best effort to kill the given process.
|
| +
|
| + We ignore all exceptions so a caller looping through a list of processes will
|
| + continue attempting to kill each, even if one encounters a problem.
|
| +
|
| + Args:
|
| + pid: The process ID.
|
| + """
|
| + try:
|
| + # os.kill doesn't work in 2.X or 3.Y on Windows for any X < 7 or Y < 2.
|
| + if IS_WINDOWS and ((2, 6) <= sys.version_info[:3] < (2, 7) or
|
| + (3, 0) <= sys.version_info[:3] < (3, 2)):
|
| + kernel32 = ctypes.windll.kernel32
|
| + handle = kernel32.OpenProcess(1, 0, pid)
|
| + kernel32.TerminateProcess(handle, 0)
|
| + else:
|
| + os.kill(pid, signal.SIGKILL)
|
| + except: # pylint: disable=bare-except
|
| + pass
|
| +
|
| +
|
| +class Task(namedtuple('Task', (
|
| + 'func args caller_id exception_handler should_return_results arg_checker '
|
| + 'fail_on_error'))):
|
| + """Task class representing work to be completed.
|
| +
|
| + Args:
|
| + func: The function to be executed.
|
| + args: The arguments to func.
|
| + caller_id: The globally-unique caller ID corresponding to the Apply call.
|
| + exception_handler: The exception handler to use if the call to func fails.
|
| + should_return_results: True iff the results of this function should be
|
| + returned from the Apply call.
|
| + arg_checker: Used to determine whether we should process the current
|
| + argument or simply skip it. Also handles any logging that
|
| + is specific to a particular type of argument.
|
| + fail_on_error: If true, then raise any exceptions encountered when
|
| + executing func. This is only applicable in the case of
|
| + process_count == thread_count == 1.
|
| + """
|
| + pass
|
| +
|
| +
|
| +class ProducerThread(threading.Thread):
|
| + """Thread used to enqueue work for other processes and threads."""
|
| +
|
| + def __init__(self, cls, args_iterator, caller_id, func, task_queue,
|
| + should_return_results, exception_handler, arg_checker,
|
| + fail_on_error):
|
| + """Initializes the producer thread.
|
| +
|
| + Args:
|
| + cls: Instance of Command for which this ProducerThread was created.
|
| + args_iterator: Iterable collection of arguments to be put into the
|
| + work queue.
|
| + caller_id: Globally-unique caller ID corresponding to this call to Apply.
|
| + func: The function to be called on each element of args_iterator.
|
| + task_queue: The queue into which tasks will be put, to later be consumed
|
| + by Command._ApplyThreads.
|
| + should_return_results: True iff the results for this call to command.Apply
|
| + were requested.
|
| + exception_handler: The exception handler to use when errors are
|
| + encountered during calls to func.
|
| + arg_checker: Used to determine whether we should process the current
|
| + argument or simply skip it. Also handles any logging that
|
| + is specific to a particular type of argument.
|
| + fail_on_error: If true, then raise any exceptions encountered when
|
| + executing func. This is only applicable in the case of
|
| + process_count == thread_count == 1.
|
| + """
|
| + super(ProducerThread, self).__init__()
|
| + self.func = func
|
| + self.cls = cls
|
| + self.args_iterator = args_iterator
|
| + self.caller_id = caller_id
|
| + self.task_queue = task_queue
|
| + self.arg_checker = arg_checker
|
| + self.exception_handler = exception_handler
|
| + self.should_return_results = should_return_results
|
| + self.fail_on_error = fail_on_error
|
| + self.shared_variables_updater = _SharedVariablesUpdater()
|
| + self.daemon = True
|
| + self.unknown_exception = None
|
| + self.iterator_exception = None
|
| + self.start()
|
| +
|
| + def run(self):
|
| + num_tasks = 0
|
| + cur_task = None
|
| + last_task = None
|
| + try:
|
| + args_iterator = iter(self.args_iterator)
|
| + while True:
|
| + try:
|
| + args = args_iterator.next()
|
| + except StopIteration, e:
|
| + break
|
| + except Exception, e: # pylint: disable=broad-except
|
| + _IncrementFailureCount()
|
| + if self.fail_on_error:
|
| + self.iterator_exception = e
|
| + raise
|
| + else:
|
| + try:
|
| + self.exception_handler(self.cls, e)
|
| + except Exception, _: # pylint: disable=broad-except
|
| + self.cls.logger.debug(
|
| + 'Caught exception while handling exception for %s:\n%s',
|
| + self.func, traceback.format_exc())
|
| + self.shared_variables_updater.Update(self.caller_id, self.cls)
|
| + continue
|
| +
|
| + if self.arg_checker(self.cls, args):
|
| + num_tasks += 1
|
| + last_task = cur_task
|
| + cur_task = Task(self.func, args, self.caller_id,
|
| + self.exception_handler, self.should_return_results,
|
| + self.arg_checker, self.fail_on_error)
|
| + if last_task:
|
| + self.task_queue.put(last_task)
|
| + except Exception, e: # pylint: disable=broad-except
|
| + # This will also catch any exception raised due to an error in the
|
| + # iterator when fail_on_error is set, so check that we failed for some
|
| + # other reason before claiming that we had an unknown exception.
|
| + if not self.iterator_exception:
|
| + self.unknown_exception = e
|
| + finally:
|
| + # We need to make sure to update total_tasks[caller_id] before we enqueue
|
| + # the last task. Otherwise, a worker can retrieve the last task and
|
| + # complete it, then check total_tasks and determine that we're not done
|
| + # producing all before we update total_tasks. This approach forces workers
|
| + # to wait on the last task until after we've updated total_tasks.
|
| + total_tasks[self.caller_id] = num_tasks
|
| + if not cur_task:
|
| + # This happens if there were zero arguments to be put in the queue.
|
| + cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id,
|
| + None, None, None, None)
|
| + self.task_queue.put(cur_task)
|
| +
|
| + # It's possible that the workers finished before we updated total_tasks,
|
| + # so we need to check here as well.
|
| + _NotifyIfDone(self.caller_id,
|
| + caller_id_finished_count.Get(self.caller_id))
|
| +
|
| +
|
| +class WorkerPool(object):
|
| + """Pool of worker threads to which tasks can be added."""
|
| +
|
| + def __init__(self, thread_count, logger, bucket_storage_uri_class=None,
|
| + gsutil_api_map=None, debug=0):
|
| + self.task_queue = _NewThreadsafeQueue()
|
| + self.threads = []
|
| + for _ in range(thread_count):
|
| + worker_thread = WorkerThread(
|
| + self.task_queue, logger,
|
| + bucket_storage_uri_class=bucket_storage_uri_class,
|
| + gsutil_api_map=gsutil_api_map, debug=debug)
|
| + self.threads.append(worker_thread)
|
| + worker_thread.start()
|
| +
|
| + def AddTask(self, task):
|
| + self.task_queue.put(task)
|
| +
|
| +
|
| +class WorkerThread(threading.Thread):
|
| + """Thread where all the work will be performed.
|
| +
|
| + This makes the function calls for Apply and takes care of all error handling,
|
| + return value propagation, and shared_vars.
|
| +
|
| + Note that this thread is NOT started upon instantiation because the function-
|
| + calling logic is also used in the single-threaded case.
|
| + """
|
| +
|
| + def __init__(self, task_queue, logger, bucket_storage_uri_class=None,
|
| + gsutil_api_map=None, debug=0):
|
| + """Initializes the worker thread.
|
| +
|
| + Args:
|
| + task_queue: The thread-safe queue from which this thread should obtain
|
| + its work.
|
| + logger: Logger to use for this thread.
|
| + bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
|
| + Settable for testing/mocking.
|
| + gsutil_api_map: Map of providers and API selector tuples to api classes
|
| + which can be used to communicate with those providers.
|
| + Used for the instantiating CloudApiDelegator class.
|
| + debug: debug level for the CloudApiDelegator class.
|
| + """
|
| + super(WorkerThread, self).__init__()
|
| + self.task_queue = task_queue
|
| + self.daemon = True
|
| + self.cached_classes = {}
|
| + self.shared_vars_updater = _SharedVariablesUpdater()
|
| +
|
| + self.thread_gsutil_api = None
|
| + if bucket_storage_uri_class and gsutil_api_map:
|
| + self.thread_gsutil_api = CloudApiDelegator(
|
| + bucket_storage_uri_class, gsutil_api_map, logger, debug=debug)
|
| +
|
| + def PerformTask(self, task, cls):
|
| + """Makes the function call for a task.
|
| +
|
| + Args:
|
| + task: The Task to perform.
|
| + cls: The instance of a class which gives context to the functions called
|
| + by the Task's function. E.g., see SetAclFuncWrapper.
|
| + """
|
| + caller_id = task.caller_id
|
| + try:
|
| + results = task.func(cls, task.args, thread_state=self.thread_gsutil_api)
|
| + if task.should_return_results:
|
| + global_return_values_map.Update(caller_id, [results], default_value=[])
|
| + except Exception, e: # pylint: disable=broad-except
|
| + _IncrementFailureCount()
|
| + if task.fail_on_error:
|
| + raise # Only happens for single thread and process case.
|
| + else:
|
| + try:
|
| + task.exception_handler(cls, e)
|
| + except Exception, _: # pylint: disable=broad-except
|
| + # Don't allow callers to raise exceptions here and kill the worker
|
| + # threads.
|
| + cls.logger.debug(
|
| + 'Caught exception while handling exception for %s:\n%s',
|
| + task, traceback.format_exc())
|
| + finally:
|
| + self.shared_vars_updater.Update(caller_id, cls)
|
| +
|
| + # Even if we encounter an exception, we still need to claim that that
|
| + # the function finished executing. Otherwise, we won't know when to
|
| + # stop waiting and return results.
|
| + num_done = caller_id_finished_count.Update(caller_id, 1)
|
| +
|
| + if cls.multiprocessing_is_available:
|
| + _NotifyIfDone(caller_id, num_done)
|
| +
|
| + def run(self):
|
| + while True:
|
| + task = self.task_queue.get()
|
| + caller_id = task.caller_id
|
| +
|
| + # Get the instance of the command with the appropriate context.
|
| + cls = self.cached_classes.get(caller_id, None)
|
| + if not cls:
|
| + cls = copy.copy(class_map[caller_id])
|
| + cls.logger = CreateGsutilLogger(cls.command_name)
|
| + self.cached_classes[caller_id] = cls
|
| +
|
| + self.PerformTask(task, cls)
|
| +
|
| +
|
| +class _SharedVariablesUpdater(object):
|
| + """Used to update shared variable for a class in the global map.
|
| +
|
| + Note that each thread will have its own instance of the calling class for
|
| + context, and it will also have its own instance of a
|
| + _SharedVariablesUpdater. This is used in the following way:
|
| +
|
| + 1. Before any tasks are performed, each thread will get a copy of the
|
| + calling class, and the globally-consistent value of this shared variable
|
| + will be initialized to whatever it was before the call to Apply began.
|
| +
|
| + 2. After each time a thread performs a task, it will look at the current
|
| + values of the shared variables in its instance of the calling class.
|
| +
|
| + 2.A. For each such variable, it computes the delta of this variable
|
| + between the last known value for this class (which is stored in
|
| + a dict local to this class) and the current value of the variable
|
| + in the class.
|
| +
|
| + 2.B. Using this delta, we update the last known value locally as well
|
| + as the globally-consistent value shared across all classes (the
|
| + globally consistent value is simply increased by the computed
|
| + delta).
|
| + """
|
| +
|
| + def __init__(self):
|
| + self.last_shared_var_values = {}
|
| +
|
| + def Update(self, caller_id, cls):
|
| + """Update any shared variables with their deltas."""
|
| + shared_vars = shared_vars_list_map.get(caller_id, None)
|
| + if shared_vars:
|
| + for name in shared_vars:
|
| + key = (caller_id, name)
|
| + last_value = self.last_shared_var_values.get(key, 0)
|
| + # Compute the change made since the last time we updated here. This is
|
| + # calculated by simply subtracting the last known value from the current
|
| + # value in the class instance.
|
| + delta = getattr(cls, name) - last_value
|
| + self.last_shared_var_values[key] = delta + last_value
|
| +
|
| + # Update the globally-consistent value by simply increasing it by the
|
| + # computed delta.
|
| + shared_vars_map.Update(key, delta)
|
| +
|
| +
|
| +def _NotifyIfDone(caller_id, num_done):
|
| + """Notify any threads waiting for results that something has finished.
|
| +
|
| + Each waiting thread will then need to check the call_completed_map to see if
|
| + its work is done.
|
| +
|
| + Note that num_done could be calculated here, but it is passed in as an
|
| + optimization so that we have one less call to a globally-locked data
|
| + structure.
|
| +
|
| + Args:
|
| + caller_id: The caller_id of the function whose progress we're checking.
|
| + num_done: The number of tasks currently completed for that caller_id.
|
| + """
|
| + num_to_do = total_tasks[caller_id]
|
| + if num_to_do == num_done and num_to_do >= 0:
|
| + # Notify the Apply call that's sleeping that it's ready to return.
|
| + with need_pool_or_done_cond:
|
| + call_completed_map[caller_id] = True
|
| + need_pool_or_done_cond.notify_all()
|
| +
|
| +
|
| +def ShutDownGsutil():
|
| + """Shut down all processes in consumer pools in preparation for exiting."""
|
| + for q in queues:
|
| + try:
|
| + q.cancel_join_thread()
|
| + except: # pylint: disable=bare-except
|
| + pass
|
| + for consumer_pool in consumer_pools:
|
| + consumer_pool.ShutDown()
|
| +
|
| +
|
| +# pylint: disable=global-variable-undefined
|
| +def _IncrementFailureCount():
|
| + global failure_count
|
| + if isinstance(failure_count, int):
|
| + failure_count += 1
|
| + else: # Otherwise it's a multiprocessing.Value() of type 'i'.
|
| + failure_count.value += 1
|
| +
|
| +
|
| +# pylint: disable=global-variable-undefined
|
| +def GetFailureCount():
|
| + """Returns the number of failures processed during calls to Apply()."""
|
| + try:
|
| + if isinstance(failure_count, int):
|
| + return failure_count
|
| + else: # It's a multiprocessing.Value() of type 'i'.
|
| + return failure_count.value
|
| + except NameError: # If it wasn't initialized, Apply() wasn't called.
|
| + return 0
|
| +
|
| +
|
| +def ResetFailureCount():
|
| + """Resets the failure_count variable to 0 - useful if error is expected."""
|
| + try:
|
| + global failure_count
|
| + if isinstance(failure_count, int):
|
| + failure_count = 0
|
| + else: # It's a multiprocessing.Value() of type 'i'.
|
| + failure_count = multiprocessing.Value('i', 0)
|
| + except NameError: # If it wasn't initialized, Apply() wasn't called.
|
| + pass
|
|
|