| Index: gslib/command.py
|
| ===================================================================
|
| --- gslib/command.py (revision 33376)
|
| +++ gslib/command.py (working copy)
|
| @@ -1,3 +1,4 @@
|
| +# -*- coding: utf-8 -*-
|
| # Copyright 2010 Google Inc. All Rights Reserved.
|
| #
|
| # Licensed under the Apache License, Version 2.0 (the "License");
|
| @@ -11,70 +12,83 @@
|
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| # See the License for the specific language governing permissions and
|
| # limitations under the License.
|
| -
|
| """Base class for gsutil commands.
|
|
|
| In addition to base class code, this file contains helpers that depend on base
|
| -class state (such as GetAclCommandHelper) In general, functions that depend on
|
| +class state (such as GetAndPrintAcl) In general, functions that depend on
|
| class state and that are used by multiple commands belong in this file.
|
| Functions that don't depend on class state belong in util.py, and non-shared
|
| helpers belong in individual subclasses.
|
| """
|
|
|
| -import boto
|
| +from __future__ import absolute_import
|
| +
|
| import codecs
|
| +from collections import namedtuple
|
| import copy
|
| import getopt
|
| -import gslib
|
| +from getopt import GetoptError
|
| import logging
|
| import multiprocessing
|
| import os
|
| import Queue
|
| -import re
|
| import signal
|
| import sys
|
| import textwrap
|
| import threading
|
| import traceback
|
| -import wildcard_iterator
|
| -import xml.dom.minidom
|
|
|
| -from boto import handler
|
| -from boto.exception import GSResponseError
|
| +import boto
|
| from boto.storage_uri import StorageUri
|
| -from collections import namedtuple
|
| -from getopt import GetoptError
|
| -from gslib import util
|
| +import gslib
|
| +from gslib.cloud_api import AccessDeniedException
|
| +from gslib.cloud_api import ArgumentException
|
| +from gslib.cloud_api import ServiceException
|
| +from gslib.cloud_api_delegator import CloudApiDelegator
|
| +from gslib.cs_api_map import ApiSelector
|
| +from gslib.cs_api_map import GsutilApiMapFactory
|
| from gslib.exception import CommandException
|
| from gslib.help_provider import HelpProvider
|
| from gslib.name_expansion import NameExpansionIterator
|
| -from gslib.name_expansion import NameExpansionIteratorQueue
|
| +from gslib.name_expansion import NameExpansionResult
|
| from gslib.parallelism_framework_util import AtomicIncrementDict
|
| from gslib.parallelism_framework_util import BasicIncrementDict
|
| from gslib.parallelism_framework_util import ThreadAndProcessSafeDict
|
| -from gslib.project_id import ProjectIdHandler
|
| -from gslib.storage_uri_builder import StorageUriBuilder
|
| +from gslib.plurality_checkable_iterator import PluralityCheckableIterator
|
| +from gslib.storage_url import StorageUrlFromString
|
| +from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
|
| +from gslib.translation_helper import AclTranslation
|
| from gslib.util import GetConfigFilePath
|
| +from gslib.util import HaveFileUrls
|
| +from gslib.util import HaveProviderUrls
|
| from gslib.util import IS_WINDOWS
|
| from gslib.util import MultiprocessingIsAvailable
|
| from gslib.util import NO_MAX
|
| -from gslib.wildcard_iterator import ContainsWildcard
|
| -from oauth2client.client import HAS_CRYPTO
|
| +from gslib.util import UrlsAreForSingleProvider
|
| +from gslib.util import UTF8
|
| +from gslib.wildcard_iterator import CreateWildcardIterator
|
|
|
| +OFFER_GSUTIL_M_SUGGESTION_THRESHOLD = 5
|
| +
|
| if IS_WINDOWS:
|
| - import ctypes
|
| + import ctypes # pylint: disable=g-import-not-at-top
|
|
|
|
|
| def _DefaultExceptionHandler(cls, e):
|
| cls.logger.exception(e)
|
|
|
| +
|
| def CreateGsutilLogger(command_name):
|
| - """Creates a logger that resembles 'print' output, but is thread safe and
|
| - abides by gsutil -d/-D/-DD/-q options.
|
| + """Creates a logger that resembles 'print' output.
|
|
|
| + This logger abides by gsutil -d/-D/-DD/-q options.
|
| +
|
| By default (if none of the above options is specified) the logger will display
|
| all messages logged with level INFO or above. Log propagation is disabled.
|
|
|
| + Args:
|
| + command_name: Command name to create logger for.
|
| +
|
| Returns:
|
| A logger object.
|
| """
|
| @@ -89,25 +103,26 @@
|
| log.addHandler(log_handler)
|
| return log
|
|
|
| -def _UriArgChecker(command_instance, uri):
|
| - exp_src_uri = command_instance.suri_builder.StorageUri(
|
| - uri.GetExpandedUriStr())
|
| - command_instance.logger.debug('process %d is handling uri %s',
|
| - os.getpid(), exp_src_uri)
|
| - if (command_instance.exclude_symlinks and exp_src_uri.is_file_uri()
|
| - and os.path.islink(exp_src_uri.object_name)):
|
| - command_instance.logger.info('Skipping symbolic link %s...', exp_src_uri)
|
| +
|
| +def _UrlArgChecker(command_instance, url):
|
| + if not command_instance.exclude_symlinks:
|
| + return True
|
| + exp_src_url = url.expanded_storage_url
|
| + if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name):
|
| + command_instance.logger.info('Skipping symbolic link %s...', exp_src_url)
|
| return False
|
| return True
|
|
|
|
|
| -def DummyArgChecker(command_instance, arg):
|
| +def DummyArgChecker(*unused_args):
|
| return True
|
|
|
| -def _SetAclFuncWrapper(cls, name_expansion_result):
|
| - return cls._SetAclFunc(name_expansion_result)
|
|
|
| -def _SetAclExceptionHandler(cls, e):
|
| +def SetAclFuncWrapper(cls, name_expansion_result, thread_state=None):
|
| + return cls.SetAclFunc(name_expansion_result, thread_state=thread_state)
|
| +
|
| +
|
| +def SetAclExceptionHandler(cls, e):
|
| """Exception handler that maintains state about post-completion status."""
|
| cls.logger.error(str(e))
|
| cls.everything_set_okay = False
|
| @@ -120,27 +135,18 @@
|
| # However, this also lets us shut down somewhat more cleanly when interrupted.
|
| queues = []
|
|
|
| +
|
| def _NewMultiprocessingQueue():
|
| queue = multiprocessing.Queue(MAX_QUEUE_SIZE)
|
| queues.append(queue)
|
| return queue
|
|
|
| +
|
| def _NewThreadsafeQueue():
|
| queue = Queue.Queue(MAX_QUEUE_SIZE)
|
| queues.append(queue)
|
| return queue
|
|
|
| -
|
| -# command_spec key constants.
|
| -COMMAND_NAME = 'command_name'
|
| -COMMAND_NAME_ALIASES = 'command_name_aliases'
|
| -MIN_ARGS = 'min_args'
|
| -MAX_ARGS = 'max_args'
|
| -SUPPORTED_SUB_ARGS = 'supported_sub_args'
|
| -FILE_URIS_OK = 'file_uri_ok'
|
| -PROVIDER_URIS_OK = 'provider_uri_ok'
|
| -URIS_START_ARG = 'uris_start_arg'
|
| -
|
| # The maximum size of a process- or thread-safe queue. Imposing this limit
|
| # prevents us from needing to hold an arbitrary amount of data in memory.
|
| # However, setting this number too high (e.g., >= 32768 on OS X) can cause
|
| @@ -152,7 +158,7 @@
|
| # causing problems with infinite recursion, and it can be increased if needed.
|
| MAX_RECURSIVE_DEPTH = 5
|
|
|
| -ZERO_TASKS_TO_DO_ARGUMENT = ("There were no", "tasks to do")
|
| +ZERO_TASKS_TO_DO_ARGUMENT = ('There were no', 'tasks to do')
|
|
|
| # Map from deprecated aliases to the current command and subcommands that
|
| # provide the same behavior.
|
| @@ -177,27 +183,29 @@
|
| # Declare all of the module level variables - see
|
| # InitializeMultiprocessingVariables for an explanation of why this is
|
| # necessary.
|
| +# pylint: disable=global-at-module-level
|
| global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter
|
| global total_tasks, call_completed_map, global_return_values_map
|
| global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed
|
| global current_max_recursive_level, shared_vars_map, shared_vars_list_map
|
| -global class_map
|
| +global class_map, worker_checking_level_lock, failure_count
|
|
|
|
|
| def InitializeMultiprocessingVariables():
|
| - """
|
| - Used to initialize module-level variables that will be inherited by
|
| - subprocesses. On Windows, a multiprocessing.Manager object should only
|
| + """Initializes module-level variables that will be inherited by subprocesses.
|
| +
|
| + On Windows, a multiprocessing.Manager object should only
|
| be created within an "if __name__ == '__main__':" block. This function
|
| must be called, otherwise every command that calls Command.Apply will fail.
|
| """
|
| # This list of global variables must exactly match the above list of
|
| # declarations.
|
| + # pylint: disable=global-variable-undefined
|
| global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter
|
| global total_tasks, call_completed_map, global_return_values_map
|
| global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed
|
| global current_max_recursive_level, shared_vars_map, shared_vars_list_map
|
| - global class_map
|
| + global class_map, worker_checking_level_lock, failure_count
|
|
|
| manager = multiprocessing.Manager()
|
|
|
| @@ -225,6 +233,10 @@
|
| # that a call to Apply needs a new set of consumer processes.
|
| need_pool_or_done_cond = manager.Condition()
|
|
|
| + # Lock used to prevent multiple worker processes from asking the main thread
|
| + # to create a new consumer pool for the same level.
|
| + worker_checking_level_lock = manager.Lock()
|
| +
|
| # Map from caller_id to the current number of completed tasks for that ID.
|
| caller_id_finished_count = AtomicIncrementDict(manager)
|
|
|
| @@ -242,83 +254,47 @@
|
| # Map from caller_id to calling class.
|
| class_map = manager.dict()
|
|
|
| + # Number of tasks that resulted in an exception in calls to Apply().
|
| + failure_count = multiprocessing.Value('i', 0)
|
|
|
| -class Command(object):
|
| - REQUIRED_SPEC_KEYS = [COMMAND_NAME]
|
|
|
| - # Each subclass must define the following map, minimally including the
|
| - # keys in REQUIRED_SPEC_KEYS; other values below will be used as defaults,
|
| - # although for readbility subclasses should specify the complete map.
|
| - command_spec = {
|
| +# Each subclass of Command must define a property named 'command_spec' that is
|
| +# an instance of the following class.
|
| +CommandSpec = namedtuple('CommandSpec', [
|
| # Name of command.
|
| - COMMAND_NAME : None,
|
| + 'command_name',
|
| # List of command name aliases.
|
| - COMMAND_NAME_ALIASES : [],
|
| + 'command_name_aliases',
|
| # Min number of args required by this command.
|
| - MIN_ARGS : 0,
|
| + 'min_args',
|
| # Max number of args required by this command, or NO_MAX.
|
| - MAX_ARGS : NO_MAX,
|
| + 'max_args',
|
| # Getopt-style string specifying acceptable sub args.
|
| - SUPPORTED_SUB_ARGS : '',
|
| - # True if file URIs are acceptable for this command.
|
| - FILE_URIS_OK : False,
|
| - # True if provider-only URIs are acceptable for this command.
|
| - PROVIDER_URIS_OK : False,
|
| - # Index in args of first URI arg.
|
| - URIS_START_ARG : 0,
|
| - }
|
| - _default_command_spec = command_spec
|
| - help_spec = HelpProvider.help_spec
|
| - _commands_with_subcommands_and_subopts = ['acl', 'defacl', 'logging', 'web']
|
| + 'supported_sub_args',
|
| + # True if file URLs are acceptable for this command.
|
| + 'file_url_ok',
|
| + # True if provider-only URLs are acceptable for this command.
|
| + 'provider_url_ok',
|
| + # Index in args of first URL arg.
|
| + 'urls_start_arg',
|
| + # List of supported APIs
|
| + 'gs_api_support',
|
| + # Default API to use for this command
|
| + 'gs_default_api',
|
| + # Private arguments (for internal testing)
|
| + 'supported_private_args',
|
| +])
|
|
|
| - """Define an empty test specification, which derived classes must populate.
|
|
|
| - This is a list of tuples containing the following values:
|
| +class Command(HelpProvider):
|
| + """Base class for all gsutil commands."""
|
|
|
| - step_name - mnemonic name for test, displayed when test is run
|
| - cmd_line - shell command line to run test
|
| - expect_ret or None - expected return code from test (None means ignore)
|
| - (result_file, expect_file) or None - tuple of result file and expected
|
| - file to diff for additional test
|
| - verification beyond the return code
|
| - (None means no diff requested)
|
| - Notes:
|
| + # Each subclass must override this with an instance of CommandSpec.
|
| + command_spec = None
|
|
|
| - - Setting expected_ret to None means there is no expectation and,
|
| - hence, any returned value will pass.
|
| + _commands_with_subcommands_and_subopts = ['acl', 'defacl', 'logging', 'web',
|
| + 'notification']
|
|
|
| - - Any occurrences of the string 'gsutil' in the cmd_line parameter
|
| - are expanded to the full path to the gsutil command under test.
|
| -
|
| - - The cmd_line, result_file and expect_file parameters may
|
| - contain the following special substrings:
|
| -
|
| - $Bn - converted to one of 10 unique-for-testing bucket names (n=0..9)
|
| - $On - converted to one of 10 unique-for-testing object names (n=0..9)
|
| - $Fn - converted to one of 10 unique-for-testing file names (n=0..9)
|
| - $G - converted to the directory where gsutil is installed. Useful for
|
| - referencing test data.
|
| -
|
| - - The generated file names are full pathnames, whereas the generated
|
| - bucket and object names are simple relative names.
|
| -
|
| - - Tests with a non-None result_file and expect_file automatically
|
| - trigger an implicit diff of the two files.
|
| -
|
| - - These test specifications, in combination with the conversion strings
|
| - allow tests to be constructed parametrically. For example, here's an
|
| - annotated subset of a test_steps for the cp command:
|
| -
|
| - # Copy local file to object, verify 0 return code.
|
| - ('simple cp', 'gsutil cp $F1 gs://$B1/$O1', 0, None, None),
|
| - # Copy uploaded object back to local file and diff vs. orig file.
|
| - ('verify cp', 'gsutil cp gs://$B1/$O1 $F2', 0, '$F2', '$F1'),
|
| -
|
| - - After pattern substitution, the specs are run sequentially, in the
|
| - order in which they appear in the test_steps list.
|
| - """
|
| - test_steps = []
|
| -
|
| # This keeps track of the recursive depth of the current call to Apply.
|
| recursive_apply_level = 0
|
|
|
| @@ -326,48 +302,71 @@
|
| # of the caller_id.
|
| sequential_caller_id = -1
|
|
|
| + @staticmethod
|
| + def CreateCommandSpec(command_name, command_name_aliases=None, min_args=0,
|
| + max_args=NO_MAX, supported_sub_args='',
|
| + file_url_ok=False, provider_url_ok=False,
|
| + urls_start_arg=0, gs_api_support=None,
|
| + gs_default_api=None, supported_private_args=None):
|
| + """Creates an instance of CommandSpec, with defaults."""
|
| + return CommandSpec(
|
| + command_name=command_name,
|
| + command_name_aliases=command_name_aliases or [],
|
| + min_args=min_args,
|
| + max_args=max_args,
|
| + supported_sub_args=supported_sub_args,
|
| + file_url_ok=file_url_ok,
|
| + provider_url_ok=provider_url_ok,
|
| + urls_start_arg=urls_start_arg,
|
| + gs_api_support=gs_api_support or [ApiSelector.XML],
|
| + gs_default_api=gs_default_api or ApiSelector.XML,
|
| + supported_private_args=supported_private_args)
|
| +
|
| # Define a convenience property for command name, since it's used many places.
|
| def _GetDefaultCommandName(self):
|
| - return self.command_spec[COMMAND_NAME]
|
| + return self.command_spec.command_name
|
| command_name = property(_GetDefaultCommandName)
|
| -
|
| - def _CalculateUrisStartArg(self):
|
| - """Calculate the index in args of the first URI arg. By default, just use
|
| - the value from command_spec.
|
| +
|
| + def _CalculateUrlsStartArg(self):
|
| + """Calculate the index in args of the first URL arg.
|
| +
|
| + Returns:
|
| + Index of the first URL arg (according to the command spec).
|
| """
|
| - return self.command_spec[URIS_START_ARG]
|
| + return self.command_spec.urls_start_arg
|
|
|
| def _TranslateDeprecatedAliases(self, args):
|
| - """For commands that have deprecated aliases, this will map the aliases to
|
| - the corresponding new command and also warn the user about deprecation.
|
| - """
|
| + """Map deprecated aliases to the corresponding new command, and warn."""
|
| new_command_args = OLD_ALIAS_MAP.get(self.command_alias_used, None)
|
| if new_command_args:
|
| # Prepend any subcommands for the new command. The command name itself
|
| # is not part of the args, so leave it out.
|
| args = new_command_args[1:] + args
|
| - self.logger.warn('\n'.join(textwrap.wrap((
|
| - 'You are using a deprecated alias, "%(used_alias)s", for the '
|
| - '"%(command_name)s" command. This will stop working on 9/9/2014. '
|
| - 'Please use "%(command_name)s" with the appropriate sub-command in '
|
| - 'the future. See "gsutil help %(command_name)s" for details.') %
|
| + self.logger.warn('\n'.join(textwrap.wrap(
|
| + ('You are using a deprecated alias, "%(used_alias)s", for the '
|
| + '"%(command_name)s" command. This will stop working on 9/9/2014. '
|
| + 'Please use "%(command_name)s" with the appropriate sub-command in '
|
| + 'the future. See "gsutil help %(command_name)s" for details.') %
|
| {'used_alias': self.command_alias_used,
|
| - 'command_name': self.command_name })))
|
| + 'command_name': self.command_name})))
|
| return args
|
|
|
| def __init__(self, command_runner, args, headers, debug, parallel_operations,
|
| - config_file_list, bucket_storage_uri_class, test_method=None,
|
| - logging_filters=None, command_alias_used=None):
|
| - """
|
| + bucket_storage_uri_class, gsutil_api_class_map_factory,
|
| + test_method=None, logging_filters=None,
|
| + command_alias_used=None):
|
| + """Instantiates a Command.
|
| +
|
| Args:
|
| command_runner: CommandRunner (for commands built atop other commands).
|
| args: Command-line args (arg0 = actual arg, not command name ala bash).
|
| headers: Dictionary containing optional HTTP headers to pass to boto.
|
| debug: Debug level to pass in to boto connection (range 0..3).
|
| parallel_operations: Should command operations be executed in parallel?
|
| - config_file_list: Config file list returned by GetBotoConfigFileList().
|
| bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
|
| Settable for testing/mocking.
|
| + gsutil_api_class_map_factory: Creates map of cloud storage interfaces.
|
| + Settable for testing/mocking.
|
| test_method: Optional general purpose method for testing purposes.
|
| Application and semantics of this method will vary by
|
| command and test type.
|
| @@ -389,8 +388,8 @@
|
| self.headers = headers
|
| self.debug = debug
|
| self.parallel_operations = parallel_operations
|
| - self.config_file_list = config_file_list
|
| self.bucket_storage_uri_class = bucket_storage_uri_class
|
| + self.gsutil_api_class_map_factory = gsutil_api_class_map_factory
|
| self.test_method = test_method
|
| self.exclude_symlinks = False
|
| self.recursion_requested = False
|
| @@ -400,61 +399,59 @@
|
| # Global instance of a threaded logger object.
|
| self.logger = CreateGsutilLogger(self.command_name)
|
| if logging_filters:
|
| - for filter in logging_filters:
|
| - self.logger.addFilter(filter)
|
| + for log_filter in logging_filters:
|
| + self.logger.addFilter(log_filter)
|
|
|
| - # Process sub-command instance specifications.
|
| - # First, ensure subclass implementation sets all required keys.
|
| - for k in self.REQUIRED_SPEC_KEYS:
|
| - if k not in self.command_spec or self.command_spec[k] is None:
|
| - raise CommandException('"%s" command implementation is missing %s '
|
| - 'specification' % (self.command_name, k))
|
| - # Now override default command_spec with subclass-specified values.
|
| - tmp = self._default_command_spec
|
| - tmp.update(self.command_spec)
|
| - self.command_spec = tmp
|
| - del tmp
|
| + if self.command_spec is None:
|
| + raise CommandException('"%s" command implementation is missing a '
|
| + 'command_spec definition.' % self.command_name)
|
|
|
| - # Make sure command provides a test specification.
|
| - if not self.test_steps:
|
| - # TODO: Uncomment following lines when test feature is ready.
|
| - #raise CommandException('"%s" command implementation is missing test '
|
| - #'specification' % self.command_name)
|
| - pass
|
| -
|
| # Parse and validate args.
|
| args = self._TranslateDeprecatedAliases(args)
|
| try:
|
| (self.sub_opts, self.args) = getopt.getopt(
|
| - args, self.command_spec[SUPPORTED_SUB_ARGS])
|
| + args, self.command_spec.supported_sub_args,
|
| + self.command_spec.supported_private_args or [])
|
| except GetoptError, e:
|
| raise CommandException('%s for "%s" command.' % (e.msg,
|
| self.command_name))
|
| - self.command_spec[URIS_START_ARG] = self._CalculateUrisStartArg()
|
| -
|
| - if (len(self.args) < self.command_spec[MIN_ARGS]
|
| - or len(self.args) > self.command_spec[MAX_ARGS]):
|
| + # Named tuple public functions start with _
|
| + # pylint: disable=protected-access
|
| + self.command_spec = self.command_spec._replace(
|
| + urls_start_arg=self._CalculateUrlsStartArg())
|
| +
|
| + if (len(self.args) < self.command_spec.min_args
|
| + or len(self.args) > self.command_spec.max_args):
|
| self._RaiseWrongNumberOfArgumentsException()
|
|
|
| - if not (self.command_name in
|
| - self._commands_with_subcommands_and_subopts):
|
| + if self.command_name not in self._commands_with_subcommands_and_subopts:
|
| self.CheckArguments()
|
| -
|
| - self.proj_id_handler = ProjectIdHandler()
|
| - self.suri_builder = StorageUriBuilder(debug, bucket_storage_uri_class)
|
|
|
| + # Build the support and default maps from the command spec.
|
| + support_map = {
|
| + 'gs': self.command_spec.gs_api_support,
|
| + 's3': [ApiSelector.XML]
|
| + }
|
| + default_map = {
|
| + 'gs': self.command_spec.gs_default_api,
|
| + 's3': ApiSelector.XML
|
| + }
|
| + self.gsutil_api_map = GsutilApiMapFactory.GetApiMap(
|
| + self.gsutil_api_class_map_factory, support_map, default_map)
|
| +
|
| + self.project_id = None
|
| + self.gsutil_api = CloudApiDelegator(
|
| + bucket_storage_uri_class, self.gsutil_api_map,
|
| + self.logger, debug=self.debug)
|
| +
|
| # Cross-platform path to run gsutil binary.
|
| self.gsutil_cmd = ''
|
| - # Cross-platform list containing gsutil path for use with subprocess.
|
| - self.gsutil_exec_list = []
|
| # If running on Windows, invoke python interpreter explicitly.
|
| if gslib.util.IS_WINDOWS:
|
| self.gsutil_cmd += 'python '
|
| - self.gsutil_exec_list += ['python']
|
| # Add full path to gsutil to make sure we test the correct version.
|
| self.gsutil_path = gslib.GSUTIL_PATH
|
| self.gsutil_cmd += self.gsutil_path
|
| - self.gsutil_exec_list += [self.gsutil_path]
|
|
|
| # We're treating recursion_requested like it's used by all commands, but
|
| # only some of the commands accept the -R option.
|
| @@ -467,58 +464,63 @@
|
| self.multiprocessing_is_available = MultiprocessingIsAvailable()[0]
|
|
|
| def _RaiseWrongNumberOfArgumentsException(self):
|
| - """Raise an exception indicating that the wrong number of arguments was
|
| - provided for this command.
|
| - """
|
| - if len(self.args) > self.command_spec[MAX_ARGS]:
|
| + """Raises exception for wrong number of arguments supplied to command."""
|
| + if len(self.args) > self.command_spec.max_args:
|
| message = ('The %s command accepts at most %d arguments.' %
|
| - (self.command_name, self.command_spec[MAX_ARGS]))
|
| - elif len(self.args) < self.command_spec[MIN_ARGS]:
|
| + (self.command_name, self.command_spec.max_args))
|
| + elif len(self.args) < self.command_spec.min_args:
|
| message = ('The %s command requires at least %d arguments.' %
|
| - (self.command_name, self.command_spec[MIN_ARGS]))
|
| + (self.command_name, self.command_spec.min_args))
|
| raise CommandException(message)
|
|
|
| def CheckArguments(self):
|
| - """Checks that the arguments provided on the command line fit the
|
| - expectations of the command_spec. Any commands in
|
| - self._commands_with_subcommands_and_subopts are responsible for calling
|
| - this method after handling initial parsing of their arguments.
|
| - This prevents commands with sub-commands as well as options from breaking
|
| - the parsing of getopt.
|
| + """Checks that command line arguments match the command_spec.
|
|
|
| - TODO: Provide a function to parse commands and sub-commands more
|
| - intelligently once we stop allowing the deprecated command versions.
|
| + Any commands in self._commands_with_subcommands_and_subopts are responsible
|
| + for calling this method after handling initial parsing of their arguments.
|
| + This prevents commands with sub-commands as well as options from breaking
|
| + the parsing of getopt.
|
| +
|
| + TODO: Provide a function to parse commands and sub-commands more
|
| + intelligently once we stop allowing the deprecated command versions.
|
| +
|
| + Raises:
|
| + CommandException if the arguments don't match.
|
| """
|
|
|
| - if (not self.command_spec[FILE_URIS_OK]
|
| - and self.HaveFileUris(self.args[self.command_spec[URIS_START_ARG]:])):
|
| - raise CommandException('"%s" command does not support "file://" URIs. '
|
| - 'Did you mean to use a gs:// URI?' %
|
| + if (not self.command_spec.file_url_ok
|
| + and HaveFileUrls(self.args[self.command_spec.urls_start_arg:])):
|
| + raise CommandException('"%s" command does not support "file://" URLs. '
|
| + 'Did you mean to use a gs:// URL?' %
|
| self.command_name)
|
| - if (not self.command_spec[PROVIDER_URIS_OK]
|
| - and self._HaveProviderUris(
|
| - self.args[self.command_spec[URIS_START_ARG]:])):
|
| + if (not self.command_spec.provider_url_ok
|
| + and HaveProviderUrls(self.args[self.command_spec.urls_start_arg:])):
|
| raise CommandException('"%s" command does not support provider-only '
|
| - 'URIs.' % self.command_name)
|
| + 'URLs.' % self.command_name)
|
|
|
| - def WildcardIterator(self, uri_or_str, all_versions=False):
|
| - """
|
| - Helper to instantiate gslib.WildcardIterator. Args are same as
|
| - gslib.WildcardIterator interface, but this method fills in most of the
|
| - values from instance state.
|
| + def WildcardIterator(self, url_string, all_versions=False):
|
| + """Helper to instantiate gslib.WildcardIterator.
|
|
|
| + Args are same as gslib.WildcardIterator interface, but this method fills in
|
| + most of the values from instance state.
|
| +
|
| Args:
|
| - uri_or_str: StorageUri or URI string naming wildcard objects to iterate.
|
| + url_string: URL string naming wildcard objects to iterate.
|
| + all_versions: If true, the iterator yields all versions of objects
|
| + matching the wildcard. If false, yields just the live
|
| + object version.
|
| +
|
| + Returns:
|
| + WildcardIterator for use by caller.
|
| """
|
| - return wildcard_iterator.wildcard_iterator(
|
| - uri_or_str, self.proj_id_handler,
|
| - bucket_storage_uri_class=self.bucket_storage_uri_class,
|
| - all_versions=all_versions,
|
| - headers=self.headers, debug=self.debug)
|
| + return CreateWildcardIterator(
|
| + url_string, self.gsutil_api, all_versions=all_versions,
|
| + debug=self.debug, project_id=self.project_id)
|
|
|
| def RunCommand(self):
|
| - """Abstract function in base class. Subclasses must implement this. The
|
| - return value of this function will be used as the exit status of the
|
| + """Abstract function in base class. Subclasses must implement this.
|
| +
|
| + The return value of this function will be used as the exit status of the
|
| process, so subclass commands should return an integer exit code (0 for
|
| success, a value in [1,255] for failure).
|
| """
|
| @@ -529,141 +531,174 @@
|
| # Shared helper functions that depend on base class state. #
|
| ############################################################
|
|
|
| - def UrisAreForSingleProvider(self, uri_args):
|
| - """Tests whether the uris are all for a single provider.
|
| + def ApplyAclFunc(self, acl_func, acl_excep_handler, url_strs):
|
| + """Sets the standard or default object ACL depending on self.command_name.
|
|
|
| - Returns: a StorageUri for one of the uris on success, None on failure.
|
| + Args:
|
| + acl_func: ACL function to be passed to Apply.
|
| + acl_excep_handler: ACL exception handler to be passed to Apply.
|
| + url_strs: URL strings on which to set ACL.
|
| +
|
| + Raises:
|
| + CommandException if an ACL could not be set.
|
| """
|
| - provider = None
|
| - uri = None
|
| - for uri_str in uri_args:
|
| - # validate=False because we allow wildcard uris.
|
| - uri = boto.storage_uri(
|
| - uri_str, debug=self.debug, validate=False,
|
| - bucket_storage_uri_class=self.bucket_storage_uri_class)
|
| - if not provider:
|
| - provider = uri.scheme
|
| - elif uri.scheme != provider:
|
| - return None
|
| - return uri
|
| -
|
| - def _SetAclFunc(self, name_expansion_result):
|
| - exp_src_uri = self.suri_builder.StorageUri(
|
| - name_expansion_result.GetExpandedUriStr())
|
| - # We don't do bucket operations multi-threaded (see comment below).
|
| - assert self.command_name != 'defacl'
|
| - self.logger.info('Setting ACL on %s...' %
|
| - name_expansion_result.expanded_uri_str)
|
| - try:
|
| - if self.canned:
|
| - exp_src_uri.set_acl(self.acl_arg, exp_src_uri.object_name, False,
|
| - self.headers)
|
| + multi_threaded_url_args = []
|
| + # Handle bucket ACL setting operations single-threaded, because
|
| + # our threading machinery currently assumes it's working with objects
|
| + # (name_expansion_iterator), and normally we wouldn't expect users to need
|
| + # to set ACLs on huge numbers of buckets at once anyway.
|
| + for url_str in url_strs:
|
| + url = StorageUrlFromString(url_str)
|
| + if url.IsCloudUrl() and url.IsBucket():
|
| + if self.recursion_requested:
|
| + # If user specified -R option, convert any bucket args to bucket
|
| + # wildcards (e.g., gs://bucket/*), to prevent the operation from
|
| + # being applied to the buckets themselves.
|
| + url.object_name = '*'
|
| + multi_threaded_url_args.append(url.url_string)
|
| + else:
|
| + # Convert to a NameExpansionResult so we can re-use the threaded
|
| + # function for the single-threaded implementation. RefType is unused.
|
| + for blr in self.WildcardIterator(url.url_string).IterBuckets(
|
| + bucket_fields=['id']):
|
| + name_expansion_for_url = NameExpansionResult(
|
| + url, False, False, blr.storage_url)
|
| + acl_func(self, name_expansion_for_url)
|
| else:
|
| - exp_src_uri.set_xml_acl(self.acl_arg, exp_src_uri.object_name, False,
|
| - self.headers)
|
| - except GSResponseError as e:
|
| - if self.continue_on_error:
|
| - exc_name, message, detail = util.ParseErrorDetail(e)
|
| - self.everything_set_okay = False
|
| - sys.stderr.write(util.FormatErrorMessage(
|
| - exc_name, e.status, e.code, e.reason, message, detail))
|
| - else:
|
| + multi_threaded_url_args.append(url_str)
|
| +
|
| + if len(multi_threaded_url_args) >= 1:
|
| + name_expansion_iterator = NameExpansionIterator(
|
| + self.command_name, self.debug,
|
| + self.logger, self.gsutil_api,
|
| + multi_threaded_url_args, self.recursion_requested,
|
| + all_versions=self.all_versions,
|
| + continue_on_error=self.continue_on_error or self.parallel_operations)
|
| +
|
| + # Perform requests in parallel (-m) mode, if requested, using
|
| + # configured number of parallel processes and threads. Otherwise,
|
| + # perform requests with sequential function calls in current process.
|
| + self.Apply(acl_func, name_expansion_iterator, acl_excep_handler,
|
| + fail_on_error=not self.continue_on_error)
|
| +
|
| + if not self.everything_set_okay and not self.continue_on_error:
|
| + raise CommandException('ACLs for some objects could not be set.')
|
| +
|
| + def SetAclFunc(self, name_expansion_result, thread_state=None):
|
| + """Sets the object ACL for the name_expansion_result provided.
|
| +
|
| + Args:
|
| + name_expansion_result: NameExpansionResult describing the target object.
|
| + thread_state: If present, use this gsutil Cloud API instance for the set.
|
| + """
|
| + if thread_state:
|
| + assert not self.def_acl
|
| + gsutil_api = thread_state
|
| + else:
|
| + gsutil_api = self.gsutil_api
|
| + op_string = 'default object ACL' if self.def_acl else 'ACL'
|
| + url = name_expansion_result.expanded_storage_url
|
| + self.logger.info('Setting %s on %s...', op_string, url)
|
| + if ((gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML
|
| + and url.scheme != 'gs') or self.canned):
|
| + # If we are using canned ACLs or interacting with a non-google ACL
|
| + # model, we need to use the XML passthrough. acl_arg should either
|
| + # be a canned ACL or an XML ACL.
|
| + try:
|
| + # No canned ACL support in JSON, force XML API to be used.
|
| + orig_prefer_api = gsutil_api.prefer_api
|
| + gsutil_api.prefer_api = ApiSelector.XML
|
| + gsutil_api.XmlPassThroughSetAcl(
|
| + self.acl_arg, url, canned=self.canned,
|
| + def_obj_acl=self.def_acl, provider=url.scheme)
|
| + gsutil_api.prefer_api = orig_prefer_api
|
| + except ServiceException as e:
|
| + if self.continue_on_error:
|
| + self.everything_set_okay = False
|
| + self.logger.error(e)
|
| + else:
|
| + raise
|
| + else: # Normal Cloud API path. ACL is a JSON ACL.
|
| + try:
|
| + if url.IsBucket():
|
| + if self.def_acl:
|
| + def_obj_acl = AclTranslation.JsonToMessage(
|
| + self.acl_arg, apitools_messages.ObjectAccessControl)
|
| + bucket_metadata = apitools_messages.Bucket(
|
| + defaultObjectAcl=def_obj_acl)
|
| + gsutil_api.PatchBucket(url.bucket_name, bucket_metadata,
|
| + provider=url.scheme, fields=['id'])
|
| + else:
|
| + bucket_acl = AclTranslation.JsonToMessage(
|
| + self.acl_arg, apitools_messages.BucketAccessControl)
|
| + bucket_metadata = apitools_messages.Bucket(acl=bucket_acl)
|
| + gsutil_api.PatchBucket(url.bucket_name, bucket_metadata,
|
| + provider=url.scheme, fields=['id'])
|
| + else: # url.IsObject()
|
| + object_acl = AclTranslation.JsonToMessage(
|
| + self.acl_arg, apitools_messages.ObjectAccessControl)
|
| + object_metadata = apitools_messages.Object(acl=object_acl)
|
| + gsutil_api.PatchObjectMetadata(url.bucket_name, url.object_name,
|
| + object_metadata, provider=url.scheme,
|
| + generation=url.generation)
|
| + except ArgumentException, e:
|
| raise
|
| + except ServiceException, e:
|
| + raise
|
|
|
| - def SetAclCommandHelper(self):
|
| + def SetAclCommandHelper(self, acl_func, acl_excep_handler):
|
| + """Sets ACLs on the self.args using the passed-in acl function.
|
| +
|
| + Args:
|
| + acl_func: ACL function to be passed to Apply.
|
| + acl_excep_handler: ACL exception handler to be passed to Apply.
|
| """
|
| - Common logic for setting ACLs. Sets the standard ACL or the default
|
| - object ACL depending on self.command_name.
|
| - """
|
| -
|
| - self.acl_arg = self.args[0]
|
| - uri_args = self.args[1:]
|
| - # Disallow multi-provider acl set requests, because there are differences in
|
| + acl_arg = self.args[0]
|
| + url_args = self.args[1:]
|
| + # Disallow multi-provider setacl requests, because there are differences in
|
| # the ACL models.
|
| - storage_uri = self.UrisAreForSingleProvider(uri_args)
|
| - if not storage_uri:
|
| + if not UrlsAreForSingleProvider(url_args):
|
| raise CommandException('"%s" command spanning providers not allowed.' %
|
| self.command_name)
|
|
|
| # Determine whether acl_arg names a file containing XML ACL text vs. the
|
| # string name of a canned ACL.
|
| - if os.path.isfile(self.acl_arg):
|
| - with codecs.open(self.acl_arg, 'r', 'utf-8') as f:
|
| - self.acl_arg = f.read()
|
| + if os.path.isfile(acl_arg):
|
| + with codecs.open(acl_arg, 'r', UTF8) as f:
|
| + acl_arg = f.read()
|
| self.canned = False
|
| else:
|
| # No file exists, so expect a canned ACL string.
|
| + # Canned ACLs are not supported in JSON and we need to use the XML API
|
| + # to set them.
|
| + # validate=False because we allow wildcard urls.
|
| + storage_uri = boto.storage_uri(
|
| + url_args[0], debug=self.debug, validate=False,
|
| + bucket_storage_uri_class=self.bucket_storage_uri_class)
|
| +
|
| canned_acls = storage_uri.canned_acls()
|
| - if self.acl_arg not in canned_acls:
|
| - raise CommandException('Invalid canned ACL "%s".' % self.acl_arg)
|
| + if acl_arg not in canned_acls:
|
| + raise CommandException('Invalid canned ACL "%s".' % acl_arg)
|
| self.canned = True
|
|
|
| # Used to track if any ACLs failed to be set.
|
| self.everything_set_okay = True
|
| + self.acl_arg = acl_arg
|
|
|
| - # If user specified -R option, convert any bucket args to bucket wildcards
|
| - # (e.g., gs://bucket/*), to prevent the operation from being applied to
|
| - # the buckets themselves.
|
| - if self.recursion_requested:
|
| - for i in range(len(uri_args)):
|
| - uri = self.suri_builder.StorageUri(uri_args[i])
|
| - if uri.names_bucket():
|
| - uri_args[i] = uri.clone_replace_name('*').uri
|
| - else:
|
| - # Handle bucket ACL setting operations single-threaded, because
|
| - # our threading machinery currently assumes it's working with objects
|
| - # (name_expansion_iterator), and normally we wouldn't expect users to need
|
| - # to set ACLs on huge numbers of buckets at once anyway.
|
| - for i in range(len(uri_args)):
|
| - uri_str = uri_args[i]
|
| - if self.suri_builder.StorageUri(uri_str).names_bucket():
|
| - self._RunSingleThreadedSetAcl(self.acl_arg, uri_args)
|
| - return
|
| -
|
| - name_expansion_iterator = NameExpansionIterator(
|
| - self.command_name, self.proj_id_handler, self.headers, self.debug,
|
| - self.logger, self.bucket_storage_uri_class, uri_args,
|
| - self.recursion_requested, self.recursion_requested,
|
| - all_versions=self.all_versions)
|
| - # Perform requests in parallel (-m) mode, if requested, using
|
| - # configured number of parallel processes and threads. Otherwise,
|
| - # perform requests with sequential function calls in current process.
|
| - self.Apply(_SetAclFuncWrapper, name_expansion_iterator,
|
| - _SetAclExceptionHandler)
|
| -
|
| + self.ApplyAclFunc(acl_func, acl_excep_handler, url_args)
|
| if not self.everything_set_okay and not self.continue_on_error:
|
| raise CommandException('ACLs for some objects could not be set.')
|
|
|
| - def _RunSingleThreadedSetAcl(self, acl_arg, uri_args):
|
| - some_matched = False
|
| - for uri_str in uri_args:
|
| - for blr in self.WildcardIterator(uri_str):
|
| - if blr.HasPrefix():
|
| - continue
|
| - some_matched = True
|
| - uri = blr.GetUri()
|
| - if self.command_name == 'defacl':
|
| - self.logger.info('Setting default object ACL on %s...', uri)
|
| - if self.canned:
|
| - uri.set_def_acl(acl_arg, uri.object_name, False, self.headers)
|
| - else:
|
| - uri.set_def_xml_acl(acl_arg, False, self.headers)
|
| - else:
|
| - self.logger.info('Setting ACL on %s...', uri)
|
| - if self.canned:
|
| - uri.set_acl(acl_arg, uri.object_name, False, self.headers)
|
| - else:
|
| - uri.set_xml_acl(acl_arg, uri.object_name, False, self.headers)
|
| - if not some_matched:
|
| - raise CommandException('No URIs matched')
|
| -
|
| def _WarnServiceAccounts(self):
|
| - """Warns service account users who have received an AccessDenied error for
|
| - one of the metadata-related commands to make sure that they are listed as
|
| - Owners in the API console."""
|
| + """Warns service account users who have received an AccessDenied error.
|
|
|
| - # Import this here so that the value will be set first in oauth2_plugin.
|
| - from gslib.third_party.oauth2_plugin.oauth2_plugin import IS_SERVICE_ACCOUNT
|
| + When one of the metadata-related commands fails due to AccessDenied, user
|
| + must ensure that they are listed as an Owner in the API console.
|
| + """
|
| + # Import this here so that the value will be set first in
|
| + # gcs_oauth2_boto_plugin.
|
| + # pylint: disable=g-import-not-at-top
|
| + from gcs_oauth2_boto_plugin.oauth2_plugin import IS_SERVICE_ACCOUNT
|
|
|
| if IS_SERVICE_ACCOUNT:
|
| # This method is only called when canned ACLs are used, so the warning
|
| @@ -676,58 +711,91 @@
|
| 'address is listed as an Owner in the Team tab of the API console. '
|
| 'See "gsutil help creds" for further information.\n')))
|
|
|
| - def GetAclCommandHelper(self):
|
| - """Common logic for getting ACLs. Gets the standard ACL or the default
|
| - object ACL depending on self.command_name."""
|
| + def GetAndPrintAcl(self, url_str):
|
| + """Prints the standard or default object ACL depending on self.command_name.
|
|
|
| - # Resolve to just one object.
|
| - # Handle wildcard-less URI specially in case this is a version-specific
|
| - # URI, because WildcardIterator().IterUris() would lose the versioning info.
|
| - if not ContainsWildcard(self.args[0]):
|
| - uri = self.suri_builder.StorageUri(self.args[0])
|
| + Args:
|
| + url_str: URL string to get ACL for.
|
| + """
|
| + blr = self.GetAclCommandBucketListingReference(url_str)
|
| + url = StorageUrlFromString(url_str)
|
| + if (self.gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML
|
| + and url.scheme != 'gs'):
|
| + # Need to use XML passthrough.
|
| + try:
|
| + acl = self.gsutil_api.XmlPassThroughGetAcl(
|
| + url, def_obj_acl=self.def_acl, provider=url.scheme)
|
| + print acl.to_xml()
|
| + except AccessDeniedException, _:
|
| + self._WarnServiceAccounts()
|
| + raise
|
| else:
|
| - uris = list(self.WildcardIterator(self.args[0]).IterUris())
|
| - if len(uris) == 0:
|
| - raise CommandException('No URIs matched')
|
| - if len(uris) != 1:
|
| - raise CommandException('%s matched more than one URI, which is not '
|
| - 'allowed by the %s command' % (self.args[0], self.command_name))
|
| - uri = uris[0]
|
| - if not uri.names_bucket() and not uri.names_object():
|
| - raise CommandException('"%s" command must specify a bucket or '
|
| - 'object.' % self.command_name)
|
| - if self.command_name == 'defacl':
|
| - acl = uri.get_def_acl(False, self.headers)
|
| - else:
|
| - acl = uri.get_acl(False, self.headers)
|
| - # Pretty-print the XML to make it more easily human editable.
|
| - parsed_xml = xml.dom.minidom.parseString(acl.to_xml().encode('utf-8'))
|
| - print parsed_xml.toprettyxml(indent=' ').encode('utf-8')
|
| + if self.command_name == 'defacl':
|
| + acl = blr.root_object.defaultObjectAcl
|
| + if not acl:
|
| + self.logger.warn(
|
| + 'No default object ACL present for %s. This could occur if '
|
| + 'the default object ACL is private, in which case objects '
|
| + 'created in this bucket will be readable only by their '
|
| + 'creators. It could also mean you do not have OWNER permission '
|
| + 'on %s and therefore do not have permission to read the '
|
| + 'default object ACL.', url_str, url_str)
|
| + else:
|
| + acl = blr.root_object.acl
|
| + if not acl:
|
| + self._WarnServiceAccounts()
|
| + raise AccessDeniedException('Access denied. Please ensure you have '
|
| + 'OWNER permission on %s.' % url_str)
|
| + print AclTranslation.JsonFromMessage(acl)
|
|
|
| - def GetXmlSubresource(self, subresource, uri_arg):
|
| - """Print an xml subresource, e.g. logging, for a bucket/object.
|
| + def GetAclCommandBucketListingReference(self, url_str):
|
| + """Gets a single bucket listing reference for an acl get command.
|
|
|
| Args:
|
| - subresource: The subresource name.
|
| - uri_arg: URI for the bucket/object. Wildcards will be expanded.
|
| + url_str: URL string to get the bucket listing reference for.
|
|
|
| + Returns:
|
| + BucketListingReference for the URL string.
|
| +
|
| Raises:
|
| - CommandException: if errors encountered.
|
| + CommandException if string did not result in exactly one reference.
|
| """
|
| - # Wildcarding is allowed but must resolve to just one bucket.
|
| - uris = list(self.WildcardIterator(uri_arg).IterUris())
|
| - if len(uris) != 1:
|
| - raise CommandException('Wildcards must resolve to exactly one item for '
|
| - 'get %s' % subresource)
|
| - uri = uris[0]
|
| - xml_str = uri.get_subresource(subresource, False, self.headers)
|
| - # Pretty-print the XML to make it more easily human editable.
|
| - parsed_xml = xml.dom.minidom.parseString(xml_str.encode('utf-8'))
|
| - print parsed_xml.toprettyxml(indent=' ')
|
| + # We're guaranteed by caller that we have the appropriate type of url
|
| + # string for the call (ex. we will never be called with an object string
|
| + # by getdefacl)
|
| + wildcard_url = StorageUrlFromString(url_str)
|
| + if wildcard_url.IsObject():
|
| + plurality_iter = PluralityCheckableIterator(
|
| + self.WildcardIterator(url_str).IterObjects(
|
| + bucket_listing_fields=['acl']))
|
| + else:
|
| + # Bucket or provider. We call IterBuckets explicitly here to ensure that
|
| + # the root object is populated with the acl.
|
| + if self.command_name == 'defacl':
|
| + bucket_fields = ['defaultObjectAcl']
|
| + else:
|
| + bucket_fields = ['acl']
|
| + plurality_iter = PluralityCheckableIterator(
|
| + self.WildcardIterator(url_str).IterBuckets(
|
| + bucket_fields=bucket_fields))
|
| + if plurality_iter.IsEmpty():
|
| + raise CommandException('No URLs matched')
|
| + if plurality_iter.HasPlurality():
|
| + raise CommandException(
|
| + '%s matched more than one URL, which is not allowed by the %s '
|
| + 'command' % (url_str, self.command_name))
|
| + return list(plurality_iter)[0]
|
|
|
| - def _HandleMultiProcessingControlC(self, signal_num, cur_stack_frame):
|
| - """Called when user hits ^C during a multi-processing/multi-threaded
|
| - request, so we can kill the subprocesses."""
|
| + def _HandleMultiProcessingControlC(self, unused_signal_num,
|
| + unused_cur_stack_frame):
|
| + """Called when user hits ^C during a multi-process/multi-thread request.
|
| +
|
| + Kills subprocesses.
|
| +
|
| + Args:
|
| + unused_signal_num: signal generated by ^C.
|
| + unused_cur_stack_frame: Current stack frame.
|
| + """
|
| # Note: This only works under Linux/MacOS. See
|
| # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details
|
| # about why making it work correctly across OS's is harder and still open.
|
| @@ -736,38 +804,57 @@
|
| # Simply calling sys.exit(1) doesn't work - see above bug for details.
|
| KillProcess(os.getpid())
|
|
|
| - def HaveFileUris(self, args_to_check):
|
| - """Checks whether args_to_check contain any file URIs.
|
| + def GetSingleBucketUrlFromArg(self, arg, bucket_fields=None):
|
| + """Gets a single bucket URL based on the command arguments.
|
|
|
| Args:
|
| - args_to_check: Command-line argument subset to check.
|
| + arg: String argument to get bucket URL for.
|
| + bucket_fields: Fields to populate for the bucket.
|
|
|
| Returns:
|
| - True if args_to_check contains any file URIs.
|
| + (StorageUrl referring to a single bucket, Bucket metadata).
|
| +
|
| + Raises:
|
| + CommandException if args did not match exactly one bucket.
|
| """
|
| - for uri_str in args_to_check:
|
| - if uri_str.lower().startswith('file://') or uri_str.find(':') == -1:
|
| - return True
|
| - return False
|
| + plurality_checkable_iterator = self.GetBucketUrlIterFromArg(
|
| + arg, bucket_fields=bucket_fields)
|
| + if plurality_checkable_iterator.HasPlurality():
|
| + raise CommandException(
|
| + '%s matched more than one URL, which is not\n'
|
| + 'allowed by the %s command' % (arg, self.command_name))
|
| + blr = list(plurality_checkable_iterator)[0]
|
| + return StorageUrlFromString(blr.url_string), blr.root_object
|
|
|
| - ######################
|
| - # Private functions. #
|
| - ######################
|
| + def GetBucketUrlIterFromArg(self, arg, bucket_fields=None):
|
| + """Gets a single bucket URL based on the command arguments.
|
|
|
| - def _HaveProviderUris(self, args_to_check):
|
| - """Checks whether args_to_check contains any provider URIs (like 'gs://').
|
| -
|
| Args:
|
| - args_to_check: Command-line argument subset to check.
|
| + arg: String argument to iterate over.
|
| + bucket_fields: Fields to populate for the bucket.
|
|
|
| Returns:
|
| - True if args_to_check contains any provider URIs.
|
| + PluralityCheckableIterator over buckets.
|
| +
|
| + Raises:
|
| + CommandException if iterator matched no buckets.
|
| """
|
| - for uri_str in args_to_check:
|
| - if re.match('^[a-z]+://$', uri_str):
|
| - return True
|
| - return False
|
| -
|
| + arg_url = StorageUrlFromString(arg)
|
| + if not arg_url.IsCloudUrl() or arg_url.IsObject():
|
| + raise CommandException('"%s" command must specify a bucket' %
|
| + self.command_name)
|
| +
|
| + plurality_checkable_iterator = PluralityCheckableIterator(
|
| + self.WildcardIterator(arg).IterBuckets(
|
| + bucket_fields=bucket_fields))
|
| + if plurality_checkable_iterator.IsEmpty():
|
| + raise CommandException('No URLs matched')
|
| + return plurality_checkable_iterator
|
| +
|
| + ######################
|
| + # Private functions. #
|
| + ######################
|
| +
|
| def _ResetConnectionPool(self):
|
| # Each OS process needs to establish its own set of connections to
|
| # the server to avoid writes from different OS processes interleaving
|
| @@ -781,9 +868,10 @@
|
|
|
| def _GetProcessAndThreadCount(self, process_count, thread_count,
|
| parallel_operations_override):
|
| - """
|
| - Determines the values of process_count and thread_count that we should
|
| - actually use. If we're not performing operations in parallel, then ignore
|
| + """Determines the values of process_count and thread_count.
|
| +
|
| + These values are used for parallel operations.
|
| + If we're not performing operations in parallel, then ignore
|
| existing values and use process_count = thread_count = 1.
|
|
|
| Args:
|
| @@ -822,23 +910,23 @@
|
| thread_count = 1
|
|
|
| if IS_WINDOWS and process_count > 1:
|
| - raise CommandException('\n'.join(textwrap.wrap((
|
| - 'It is not possible to set process_count > 1 on Windows. Please '
|
| - 'update your config file (located at %s) and set '
|
| - '"process_count = 1".') %
|
| + raise CommandException('\n'.join(textwrap.wrap(
|
| + ('It is not possible to set process_count > 1 on Windows. Please '
|
| + 'update your config file (located at %s) and set '
|
| + '"parallel_process_count = 1".') %
|
| GetConfigFilePath())))
|
| self.logger.debug('process count: %d', process_count)
|
| self.logger.debug('thread count: %d', thread_count)
|
| -
|
| +
|
| return (process_count, thread_count)
|
|
|
| def _SetUpPerCallerState(self):
|
| """Set up the state for a caller id, corresponding to one Apply call."""
|
| # Get a new caller ID.
|
| with caller_id_lock:
|
| - caller_id_counter.value = caller_id_counter.value + 1
|
| + caller_id_counter.value += 1
|
| caller_id = caller_id_counter.value
|
| -
|
| +
|
| # Create a copy of self with an incremented recursive level. This allows
|
| # the class to report its level correctly if the function called from it
|
| # also needs to call Apply.
|
| @@ -849,12 +937,16 @@
|
| # recreate it later in the WorkerThread. This is not a problem since any
|
| # logger with the same name will be treated as a singleton.
|
| cls.logger = None
|
| +
|
| + # Likewise, the default API connection can't be pickled, but it is unused
|
| + # anyway as each thread gets its own API delegator.
|
| + cls.gsutil_api = None
|
| +
|
| class_map[caller_id] = cls
|
| -
|
| total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet.
|
| call_completed_map[caller_id] = False
|
| - caller_id_finished_count.put(caller_id, 0)
|
| - global_return_values_map.put(caller_id, [])
|
| + caller_id_finished_count.Put(caller_id, 0)
|
| + global_return_values_map.Put(caller_id, [])
|
| return caller_id
|
|
|
| def _CreateNewConsumerPool(self, num_processes, num_threads):
|
| @@ -863,14 +955,14 @@
|
| task_queue = _NewMultiprocessingQueue()
|
| task_queues.append(task_queue)
|
|
|
| - current_max_recursive_level.value = current_max_recursive_level.value + 1
|
| + current_max_recursive_level.value += 1
|
| if current_max_recursive_level.value > MAX_RECURSIVE_DEPTH:
|
| raise CommandException('Recursion depth of Apply calls is too great.')
|
| - for shard in range(num_processes):
|
| + for _ in range(num_processes):
|
| recursive_apply_level = len(consumer_pools)
|
| p = multiprocessing.Process(
|
| target=self._ApplyThreads,
|
| - args=(num_threads, recursive_apply_level, shard))
|
| + args=(num_threads, num_processes, recursive_apply_level))
|
| p.daemon = True
|
| processes.append(p)
|
| p.start()
|
| @@ -878,14 +970,11 @@
|
| consumer_pools.append(consumer_pool)
|
|
|
| def Apply(self, func, args_iterator, exception_handler,
|
| - shared_attrs=None, arg_checker=_UriArgChecker,
|
| + shared_attrs=None, arg_checker=_UrlArgChecker,
|
| parallel_operations_override=False, process_count=None,
|
| thread_count=None, should_return_results=False,
|
| fail_on_error=False):
|
| - """
|
| - Determines whether the necessary parts of the multiprocessing module are
|
| - available, and delegates to _ParallelApply or _SequentialApply as
|
| - appropriate.
|
| + """Calls _Parallel/SequentialApply based on multiprocessing availability.
|
|
|
| Args:
|
| func: Function to call to process each argument.
|
| @@ -908,6 +997,9 @@
|
| fail_on_error: If true, then raise any exceptions encountered when
|
| executing func. This is only applicable in the case of
|
| process_count == thread_count == 1.
|
| +
|
| + Returns:
|
| + Results from spawned threads.
|
| """
|
| if shared_attrs:
|
| original_shared_vars_values = {} # We'll add these back in at the end.
|
| @@ -919,13 +1011,14 @@
|
|
|
| (process_count, thread_count) = self._GetProcessAndThreadCount(
|
| process_count, thread_count, parallel_operations_override)
|
| +
|
| is_main_thread = (self.recursive_apply_level == 0
|
| and self.sequential_caller_id == -1)
|
|
|
| # We don't honor the fail_on_error flag in the case of multiple threads
|
| # or processes.
|
| fail_on_error = fail_on_error and (process_count * thread_count == 1)
|
| -
|
| +
|
| # Only check this from the first call in the main thread. Apart from the
|
| # fact that it's wasteful to try this multiple times in general, it also
|
| # will never work when called from a subprocess since we use daemon
|
| @@ -945,31 +1038,31 @@
|
| else:
|
| self.sequential_caller_id += 1
|
| caller_id = self.sequential_caller_id
|
| -
|
| +
|
| if is_main_thread:
|
| - global global_return_values_map, shared_vars_map
|
| + # pylint: disable=global-variable-undefined
|
| + global global_return_values_map, shared_vars_map, failure_count
|
| global caller_id_finished_count, shared_vars_list_map
|
| global_return_values_map = BasicIncrementDict()
|
| - global_return_values_map.put(caller_id, [])
|
| + global_return_values_map.Put(caller_id, [])
|
| shared_vars_map = BasicIncrementDict()
|
| caller_id_finished_count = BasicIncrementDict()
|
| shared_vars_list_map = {}
|
| + failure_count = 0
|
|
|
| -
|
| # If any shared attributes passed by caller, create a dictionary of
|
| # shared memory variables for every element in the list of shared
|
| # attributes.
|
| if shared_attrs:
|
| shared_vars_list_map[caller_id] = shared_attrs
|
| for name in shared_attrs:
|
| - shared_vars_map.put((caller_id, name), 0)
|
| + shared_vars_map.Put((caller_id, name), 0)
|
|
|
| # Make all of the requested function calls.
|
| if self.multiprocessing_is_available and thread_count * process_count > 1:
|
| self._ParallelApply(func, args_iterator, exception_handler, caller_id,
|
| - arg_checker, parallel_operations_override,
|
| - process_count, thread_count, should_return_results,
|
| - fail_on_error)
|
| + arg_checker, process_count, thread_count,
|
| + should_return_results, fail_on_error)
|
| else:
|
| self._SequentialApply(func, args_iterator, exception_handler, caller_id,
|
| arg_checker, should_return_results, fail_on_error)
|
| @@ -978,61 +1071,89 @@
|
| for name in shared_attrs:
|
| # This allows us to retain the original value of the shared variable,
|
| # and simply apply the delta after what was done during the call to
|
| - # apply.
|
| + # apply.
|
| final_value = (original_shared_vars_values[name] +
|
| - shared_vars_map.get((caller_id, name)))
|
| + shared_vars_map.Get((caller_id, name)))
|
| setattr(self, name, final_value)
|
|
|
| if should_return_results:
|
| - return global_return_values_map.get(caller_id)
|
| + return global_return_values_map.Get(caller_id)
|
|
|
| + def _MaybeSuggestGsutilDashM(self):
|
| + """Outputs a sugestion to the user to use gsutil -m."""
|
| + if not (boto.config.getint('GSUtil', 'parallel_process_count', 0) == 1 and
|
| + boto.config.getint('GSUtil', 'parallel_thread_count', 0) == 1):
|
| + self.logger.warn('\n' + textwrap.fill(
|
| + '==> NOTE: You are performing a sequence of gsutil operations that '
|
| + 'may run significantly faster if you instead use gsutil -m %s ...\n'
|
| + 'Please see the -m section under "gsutil help options" for further '
|
| + 'information about when gsutil -m can be advantageous.'
|
| + % sys.argv[1]) + '\n')
|
| +
|
| + # pylint: disable=g-doc-args
|
| def _SequentialApply(self, func, args_iterator, exception_handler, caller_id,
|
| arg_checker, should_return_results, fail_on_error):
|
| + """Performs all function calls sequentially in the current thread.
|
| +
|
| + No other threads or processes will be spawned. This degraded functionality
|
| + is used when the multiprocessing module is not available or the user
|
| + requests only one thread and one process.
|
| """
|
| - Perform all function calls sequentially in the current thread. No other
|
| - threads or processes will be spawned. This degraded functionality is only
|
| - for use when the multiprocessing module is not available for some reason.
|
| - """
|
| -
|
| # Create a WorkerThread to handle all of the logic needed to actually call
|
| # the function. Note that this thread will never be started, and all work
|
| # is done in the current thread.
|
| worker_thread = WorkerThread(None, False)
|
| args_iterator = iter(args_iterator)
|
| + # Count of sequential calls that have been made. Used for producing
|
| + # suggestion to use gsutil -m.
|
| + sequential_call_count = 0
|
| while True:
|
| -
|
| +
|
| # Try to get the next argument, handling any exceptions that arise.
|
| try:
|
| args = args_iterator.next()
|
| except StopIteration, e:
|
| break
|
| - except Exception, e:
|
| + except Exception, e: # pylint: disable=broad-except
|
| + _IncrementFailureCount()
|
| if fail_on_error:
|
| raise
|
| else:
|
| try:
|
| exception_handler(self, e)
|
| - except Exception, e1:
|
| + except Exception, _: # pylint: disable=broad-except
|
| self.logger.debug(
|
| 'Caught exception while handling exception for %s:\n%s',
|
| func, traceback.format_exc())
|
| continue
|
| +
|
| + sequential_call_count += 1
|
| + if sequential_call_count == OFFER_GSUTIL_M_SUGGESTION_THRESHOLD:
|
| + # Output suggestion near beginning of run, so user sees it early and can
|
| + # ^C and try gsutil -m.
|
| + self._MaybeSuggestGsutilDashM()
|
| if arg_checker(self, args):
|
| # Now that we actually have the next argument, perform the task.
|
| task = Task(func, args, caller_id, exception_handler,
|
| should_return_results, arg_checker, fail_on_error)
|
| worker_thread.PerformTask(task, self)
|
| + if sequential_call_count >= gslib.util.GetTermLines():
|
| + # Output suggestion at end of long run, in case user missed it at the
|
| + # start and it scrolled off-screen.
|
| + self._MaybeSuggestGsutilDashM()
|
|
|
| + # pylint: disable=g-doc-args
|
| def _ParallelApply(self, func, args_iterator, exception_handler, caller_id,
|
| - arg_checker, parallel_operations_override, process_count,
|
| - thread_count, should_return_results, fail_on_error):
|
| - """
|
| - Dispatch input arguments across a pool of parallel OS
|
| - processes and/or Python threads, based on options (-m or not)
|
| - and settings in the user's config file. If non-parallel mode
|
| - or only one OS process requested, execute requests sequentially
|
| - in the current OS process.
|
| -
|
| + arg_checker, process_count, thread_count,
|
| + should_return_results, fail_on_error):
|
| + """Dispatches input arguments across a thread/process pool.
|
| +
|
| + Pools are composed of parallel OS processes and/or Python threads,
|
| + based on options (-m or not) and settings in the user's config file.
|
| +
|
| + If only one OS process is requested/available, dispatch requests across
|
| + threads in the current OS process.
|
| +
|
| In the multi-process case, we will create one pool of worker processes for
|
| each level of the tree of recursive calls to Apply. E.g., if A calls
|
| Apply(B), and B ultimately calls Apply(C) followed by Apply(D), then we
|
| @@ -1050,7 +1171,7 @@
|
|
|
| Apply's parallelism is generally broken up into 4 cases:
|
| - If process_count == thread_count == 1, then all tasks will be executed
|
| - in this thread.
|
| + by _SequentialApply.
|
| - If process_count > 1 and thread_count == 1, then the main thread will
|
| create a new pool of processes (if they don't already exist) and each of
|
| those processes will execute the tasks in a single thread.
|
| @@ -1064,14 +1185,14 @@
|
| Args:
|
| caller_id: The caller ID unique to this call to command.Apply.
|
| See command.Apply for description of other arguments.
|
| - """
|
| + """
|
| is_main_thread = self.recursive_apply_level == 0
|
|
|
| # Catch ^C under Linux/MacOs so we can do cleanup before exiting.
|
| if not IS_WINDOWS and is_main_thread:
|
| signal.signal(signal.SIGINT, self._HandleMultiProcessingControlC)
|
|
|
| - if not len(task_queues):
|
| + if not task_queues:
|
| # The process we create will need to access the next recursive level
|
| # of task queues if it makes a call to Apply, so we always keep around
|
| # one more queue than we know we need. OTOH, if we don't create a new
|
| @@ -1080,18 +1201,28 @@
|
|
|
| if process_count > 1: # Handle process pool creation.
|
| # Check whether this call will need a new set of workers.
|
| - with need_pool_or_done_cond:
|
| +
|
| + # Each worker must acquire a shared lock before notifying the main thread
|
| + # that it needs a new worker pool, so that at most one worker asks for
|
| + # a new worker pool at once.
|
| + try:
|
| + if not is_main_thread:
|
| + worker_checking_level_lock.acquire()
|
| if self.recursive_apply_level >= current_max_recursive_level.value:
|
| - # Only the main thread is allowed to create new processes - otherwise,
|
| - # we will run into some Python bugs.
|
| - if is_main_thread:
|
| - self._CreateNewConsumerPool(process_count, thread_count)
|
| - else:
|
| - # Notify the main thread that we need a new consumer pool.
|
| - new_pool_needed.value = 1
|
| - need_pool_or_done_cond.notify_all()
|
| - # The main thread will notify us when it finishes.
|
| - need_pool_or_done_cond.wait()
|
| + with need_pool_or_done_cond:
|
| + # Only the main thread is allowed to create new processes -
|
| + # otherwise, we will run into some Python bugs.
|
| + if is_main_thread:
|
| + self._CreateNewConsumerPool(process_count, thread_count)
|
| + else:
|
| + # Notify the main thread that we need a new consumer pool.
|
| + new_pool_needed.value = 1
|
| + need_pool_or_done_cond.notify_all()
|
| + # The main thread will notify us when it finishes.
|
| + need_pool_or_done_cond.wait()
|
| + finally:
|
| + if not is_main_thread:
|
| + worker_checking_level_lock.release()
|
|
|
| # If we're running in this process, create a separate task queue. Otherwise,
|
| # if Apply has already been called with process_count > 1, then there will
|
| @@ -1132,52 +1263,63 @@
|
| new_pool_needed.value = 0
|
| self._CreateNewConsumerPool(process_count, thread_count)
|
| need_pool_or_done_cond.notify_all()
|
| -
|
| +
|
| # Note that we must check the above conditions before the wait() call;
|
| # otherwise, the notification can happen before we start waiting, in
|
| # which case we'll block forever.
|
| need_pool_or_done_cond.wait()
|
| else: # Using a single process.
|
| - shard = 0
|
| - self._ApplyThreads(thread_count, self.recursive_apply_level, shard,
|
| + self._ApplyThreads(thread_count, process_count,
|
| + self.recursive_apply_level,
|
| is_blocking_call=True, task_queue=task_queue)
|
|
|
| # We encountered an exception from the producer thread before any arguments
|
| # were enqueued, but it wouldn't have been propagated, so we'll now
|
| # explicitly raise it here.
|
| if producer_thread.unknown_exception:
|
| + # pylint: disable=raising-bad-type
|
| raise producer_thread.unknown_exception
|
|
|
| # We encountered an exception from the producer thread while iterating over
|
| # the arguments, so raise it here if we're meant to fail on error.
|
| if producer_thread.iterator_exception and fail_on_error:
|
| + # pylint: disable=raising-bad-type
|
| raise producer_thread.iterator_exception
|
|
|
| - def _ApplyThreads(self, thread_count, recursive_apply_level, shard,
|
| + def _ApplyThreads(self, thread_count, process_count, recursive_apply_level,
|
| is_blocking_call=False, task_queue=None):
|
| - """
|
| - Assigns the work from the global task queue shared among all processes
|
| - to an individual process, for later consumption either by the WorkerThreads
|
| - or in this thread if thread_count == 1.
|
| -
|
| + """Assigns the work from the multi-process global task queue.
|
| +
|
| + Work is assigned to an individual process for later consumption either by
|
| + the WorkerThreads or (if thread_count == 1) this thread.
|
| +
|
| Args:
|
| thread_count: The number of threads used to perform the work. If 1, then
|
| perform all work in this thread.
|
| + process_count: The number of processes used to perform the work.
|
| recursive_apply_level: The depth in the tree of recursive calls to Apply
|
| of this thread.
|
| - shard: Assigned subset (shard number) for this function.
|
| is_blocking_call: True iff the call to Apply is blocked on this call
|
| (which is true iff process_count == 1), implying that
|
| _ApplyThreads must behave as a blocking call.
|
| """
|
| self._ResetConnectionPool()
|
| -
|
| + self.recursive_apply_level = recursive_apply_level
|
| +
|
| task_queue = task_queue or task_queues[recursive_apply_level]
|
|
|
| + assert thread_count * process_count > 1, (
|
| + 'Invalid state, calling command._ApplyThreads with only one thread '
|
| + 'and process.')
|
| if thread_count > 1:
|
| - worker_pool = WorkerPool(thread_count)
|
| - else:
|
| - worker_pool = SameThreadWorkerPool(self)
|
| + worker_pool = WorkerPool(
|
| + thread_count, self.logger,
|
| + bucket_storage_uri_class=self.bucket_storage_uri_class,
|
| + gsutil_api_map=self.gsutil_api_map, debug=self.debug)
|
| + elif process_count > 1:
|
| + worker_pool = SameThreadWorkerPool(
|
| + self, bucket_storage_uri_class=self.bucket_storage_uri_class,
|
| + gsutil_api_map=self.gsutil_api_map, debug=self.debug)
|
|
|
| num_enqueued = 0
|
| while True:
|
| @@ -1212,6 +1354,7 @@
|
|
|
|
|
| class _ConsumerPool(object):
|
| +
|
| def __init__(self, processes, task_queue):
|
| self.processes = processes
|
| self.task_queue = task_queue
|
| @@ -1229,7 +1372,7 @@
|
| kernel32 = ctypes.windll.kernel32
|
| handle = kernel32.OpenProcess(1, 0, pid)
|
| kernel32.TerminateProcess(handle, 0)
|
| - except:
|
| + except: # pylint: disable=bare-except
|
| pass
|
| else:
|
| try:
|
| @@ -1238,10 +1381,11 @@
|
| pass
|
|
|
|
|
| -class Task(namedtuple('Task',
|
| - 'func args caller_id exception_handler should_return_results arg_checker ' +
|
| - 'fail_on_error')):
|
| - """
|
| +class Task(namedtuple('Task', (
|
| + 'func args caller_id exception_handler should_return_results arg_checker '
|
| + 'fail_on_error'))):
|
| + """Task class representing work to be completed.
|
| +
|
| Args:
|
| func: The function to be executed.
|
| args: The arguments to func.
|
| @@ -1261,10 +1405,12 @@
|
|
|
| class ProducerThread(threading.Thread):
|
| """Thread used to enqueue work for other processes and threads."""
|
| +
|
| def __init__(self, cls, args_iterator, caller_id, func, task_queue,
|
| should_return_results, exception_handler, arg_checker,
|
| fail_on_error):
|
| - """
|
| + """Initializes the producer thread.
|
| +
|
| Args:
|
| cls: Instance of Command for which this ProducerThread was created.
|
| args_iterator: Iterable collection of arguments to be put into the
|
| @@ -1311,14 +1457,15 @@
|
| args = args_iterator.next()
|
| except StopIteration, e:
|
| break
|
| - except Exception, e:
|
| + except Exception, e: # pylint: disable=broad-except
|
| + _IncrementFailureCount()
|
| if self.fail_on_error:
|
| self.iterator_exception = e
|
| raise
|
| else:
|
| try:
|
| self.exception_handler(self.cls, e)
|
| - except Exception, e1:
|
| + except Exception, _: # pylint: disable=broad-except
|
| self.cls.logger.debug(
|
| 'Caught exception while handling exception for %s:\n%s',
|
| self.func, traceback.format_exc())
|
| @@ -1332,8 +1479,8 @@
|
| self.exception_handler, self.should_return_results,
|
| self.arg_checker, self.fail_on_error)
|
| if last_task:
|
| - self.task_queue.put(last_task, self.caller_id)
|
| - except Exception, e:
|
| + self.task_queue.put(last_task)
|
| + except Exception, e: # pylint: disable=broad-except
|
| # This will also catch any exception raised due to an error in the
|
| # iterator when fail_on_error is set, so check that we failed for some
|
| # other reason before claiming that we had an unknown exception.
|
| @@ -1350,31 +1497,41 @@
|
| # This happens if there were zero arguments to be put in the queue.
|
| cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id,
|
| None, None, None, None)
|
| - self.task_queue.put(cur_task, self.caller_id)
|
| + self.task_queue.put(cur_task)
|
|
|
| # It's possible that the workers finished before we updated total_tasks,
|
| # so we need to check here as well.
|
| _NotifyIfDone(self.caller_id,
|
| - caller_id_finished_count.get(self.caller_id))
|
| -
|
| + caller_id_finished_count.Get(self.caller_id))
|
|
|
| +
|
| class SameThreadWorkerPool(object):
|
| """Behaves like a WorkerPool, but used for the single-threaded case."""
|
| - def __init__(self, cls):
|
| +
|
| + def __init__(self, cls, bucket_storage_uri_class=None,
|
| + gsutil_api_map=None, debug=0):
|
| self.cls = cls
|
| - self.worker_thread = WorkerThread(None)
|
| -
|
| + self.worker_thread = WorkerThread(
|
| + None, cls.logger,
|
| + bucket_storage_uri_class=bucket_storage_uri_class,
|
| + gsutil_api_map=gsutil_api_map, debug=debug)
|
| +
|
| def AddTask(self, task):
|
| self.worker_thread.PerformTask(task, self.cls)
|
|
|
|
|
| class WorkerPool(object):
|
| """Pool of worker threads to which tasks can be added."""
|
| - def __init__(self, thread_count):
|
| +
|
| + def __init__(self, thread_count, logger, bucket_storage_uri_class=None,
|
| + gsutil_api_map=None, debug=0):
|
| self.task_queue = _NewThreadsafeQueue()
|
| self.threads = []
|
| for _ in range(thread_count):
|
| - worker_thread = WorkerThread(self.task_queue)
|
| + worker_thread = WorkerThread(
|
| + self.task_queue, logger,
|
| + bucket_storage_uri_class=bucket_storage_uri_class,
|
| + gsutil_api_map=gsutil_api_map, debug=debug)
|
| self.threads.append(worker_thread)
|
| worker_thread.start()
|
|
|
| @@ -1383,52 +1540,63 @@
|
|
|
|
|
| class WorkerThread(threading.Thread):
|
| - """
|
| - This thread is where all of the work will be performed in actually making the
|
| - function calls for Apply. It takes care of all error handling, return value
|
| - propagation, and shared_vars.
|
| + """Thread where all the work will be performed.
|
|
|
| + This makes the function calls for Apply and takes care of all error handling,
|
| + return value propagation, and shared_vars.
|
| +
|
| Note that this thread is NOT started upon instantiation because the function-
|
| calling logic is also used in the single-threaded case.
|
| """
|
| - def __init__(self, task_queue, multiprocessing_is_available=True):
|
| - """
|
| +
|
| + def __init__(self, task_queue, logger, bucket_storage_uri_class=None,
|
| + gsutil_api_map=None, debug=0):
|
| + """Initializes the worker thread.
|
| +
|
| Args:
|
| task_queue: The thread-safe queue from which this thread should obtain
|
| its work.
|
| - multiprocessing_is_available: False iff the multiprocessing module is not
|
| - available, in which case we're using
|
| - _SequentialApply.
|
| + logger: Logger to use for this thread.
|
| + bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
|
| + Settable for testing/mocking.
|
| + gsutil_api_map: Map of providers and API selector tuples to api classes
|
| + which can be used to communicate with those providers.
|
| + Used for the instantiating CloudApiDelegator class.
|
| + debug: debug level for the CloudApiDelegator class.
|
| """
|
| super(WorkerThread, self).__init__()
|
| self.task_queue = task_queue
|
| - self.multiprocessing_is_available = multiprocessing_is_available
|
| self.daemon = True
|
| self.cached_classes = {}
|
| self.shared_vars_updater = _SharedVariablesUpdater()
|
|
|
| + self.thread_gsutil_api = None
|
| + if bucket_storage_uri_class and gsutil_api_map:
|
| + self.thread_gsutil_api = CloudApiDelegator(
|
| + bucket_storage_uri_class, gsutil_api_map, logger, debug=debug)
|
| +
|
| def PerformTask(self, task, cls):
|
| - """
|
| - Makes the function call for a task.
|
| + """Makes the function call for a task.
|
|
|
| Args:
|
| task: The Task to perform.
|
| cls: The instance of a class which gives context to the functions called
|
| - by the Task's function. E.g., see _SetAclFuncWrapper.
|
| + by the Task's function. E.g., see SetAclFuncWrapper.
|
| """
|
| caller_id = task.caller_id
|
| try:
|
| - results = task.func(cls, task.args)
|
| + results = task.func(cls, task.args, thread_state=self.thread_gsutil_api)
|
| if task.should_return_results:
|
| - global_return_values_map.update(caller_id, [results], default_value=[])
|
| - except Exception, e:
|
| + global_return_values_map.Update(caller_id, [results], default_value=[])
|
| + except Exception, e: # pylint: disable=broad-except
|
| + _IncrementFailureCount()
|
| if task.fail_on_error:
|
| raise # Only happens for single thread and process case.
|
| else:
|
| try:
|
| task.exception_handler(cls, e)
|
| - except Exception, e1:
|
| - # Don't allow callers to throw exceptions here and kill the worker
|
| + except Exception, _: # pylint: disable=broad-except
|
| + # Don't allow callers to raise exceptions here and kill the worker
|
| # threads.
|
| cls.logger.debug(
|
| 'Caught exception while handling exception for %s:\n%s',
|
| @@ -1436,13 +1604,14 @@
|
| finally:
|
| self.shared_vars_updater.Update(caller_id, cls)
|
|
|
| - # Even if we encounter an exception, we still need to claim that that
|
| - # the function finished executing. Otherwise, we won't know when to
|
| - # stop waiting and return results.
|
| - num_done = caller_id_finished_count.update(caller_id, 1)
|
| - if self.multiprocessing_is_available:
|
| - _NotifyIfDone(caller_id, num_done)
|
| + # Even if we encounter an exception, we still need to claim that that
|
| + # the function finished executing. Otherwise, we won't know when to
|
| + # stop waiting and return results.
|
| + num_done = caller_id_finished_count.Update(caller_id, 1)
|
|
|
| + if cls.multiprocessing_is_available:
|
| + _NotifyIfDone(caller_id, num_done)
|
| +
|
| def run(self):
|
| while True:
|
| task = self.task_queue.get()
|
| @@ -1459,11 +1628,12 @@
|
|
|
|
|
| class _SharedVariablesUpdater(object):
|
| - """Used to update shared variable for a class in the global map. Note that
|
| - each thread will have its own instance of the calling class for context,
|
| - and it will also have its own instance of a _SharedVariablesUpdater.
|
| - This is used in the following way:
|
| + """Used to update shared variable for a class in the global map.
|
|
|
| + Note that each thread will have its own instance of the calling class for
|
| + context, and it will also have its own instance of a
|
| + _SharedVariablesUpdater. This is used in the following way:
|
| +
|
| 1. Before any tasks are performed, each thread will get a copy of the
|
| calling class, and the globally-consistent value of this shared variable
|
| will be initialized to whatever it was before the call to Apply began.
|
| @@ -1480,7 +1650,8 @@
|
| as the globally-consistent value shared across all classes (the
|
| globally consistent value is simply increased by the computed
|
| delta).
|
| - """
|
| + """
|
| +
|
| def __init__(self):
|
| self.last_shared_var_values = {}
|
|
|
| @@ -1499,19 +1670,19 @@
|
|
|
| # Update the globally-consistent value by simply increasing it by the
|
| # computed delta.
|
| - shared_vars_map.update(key, delta)
|
| + shared_vars_map.Update(key, delta)
|
|
|
|
|
| def _NotifyIfDone(caller_id, num_done):
|
| - """
|
| - Notify any threads that are waiting for results that something has finished.
|
| + """Notify any threads waiting for results that something has finished.
|
| +
|
| Each waiting thread will then need to check the call_completed_map to see if
|
| its work is done.
|
| -
|
| +
|
| Note that num_done could be calculated here, but it is passed in as an
|
| optimization so that we have one less call to a globally-locked data
|
| structure.
|
| -
|
| +
|
| Args:
|
| caller_id: The caller_id of the function whose progress we're checking.
|
| num_done: The number of tasks currently completed for that caller_id.
|
| @@ -1523,13 +1694,46 @@
|
| call_completed_map[caller_id] = True
|
| need_pool_or_done_cond.notify_all()
|
|
|
| +
|
| def ShutDownGsutil():
|
| """Shut down all processes in consumer pools in preparation for exiting."""
|
| for q in queues:
|
| try:
|
| q.cancel_join_thread()
|
| - except:
|
| + except: # pylint: disable=bare-except
|
| pass
|
| for consumer_pool in consumer_pools:
|
| consumer_pool.ShutDown()
|
|
|
| +
|
| +# pylint: disable=global-variable-undefined
|
| +def _IncrementFailureCount():
|
| + global failure_count
|
| + if isinstance(failure_count, int):
|
| + failure_count += 1
|
| + else: # Otherwise it's a multiprocessing.Value() of type 'i'.
|
| + failure_count.value += 1
|
| +
|
| +
|
| +# pylint: disable=global-variable-undefined
|
| +def GetFailureCount():
|
| + """Returns the number of failures processed during calls to Apply()."""
|
| + try:
|
| + if isinstance(failure_count, int):
|
| + return failure_count
|
| + else: # It's a multiprocessing.Value() of type 'i'.
|
| + return failure_count.value
|
| + except NameError: # If it wasn't initialized, Apply() wasn't called.
|
| + return 0
|
| +
|
| +
|
| +def ResetFailureCount():
|
| + """Resets the failure_count variable to 0 - useful if error is expected."""
|
| + try:
|
| + global failure_count
|
| + if isinstance(failure_count, int):
|
| + failure_count = 0
|
| + else: # It's a multiprocessing.Value() of type 'i'.
|
| + failure_count = multiprocessing.Value('i', 0)
|
| + except NameError: # If it wasn't initialized, Apply() wasn't called.
|
| + pass
|
|
|