Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2487)

Unified Diff: third_party/gsutil/gslib/command.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « third_party/gsutil/gslib/cloud_api_helper.py ('k') | third_party/gsutil/gslib/command_argument.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: third_party/gsutil/gslib/command.py
diff --git a/third_party/gsutil/gslib/command.py b/third_party/gsutil/gslib/command.py
new file mode 100644
index 0000000000000000000000000000000000000000..3823b91505696bf08f8c3edd03d4af061a450575
--- /dev/null
+++ b/third_party/gsutil/gslib/command.py
@@ -0,0 +1,1810 @@
+# -*- coding: utf-8 -*-
+# Copyright 2010 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Base class for gsutil commands.
+
+In addition to base class code, this file contains helpers that depend on base
+class state (such as GetAndPrintAcl) In general, functions that depend on
+class state and that are used by multiple commands belong in this file.
+Functions that don't depend on class state belong in util.py, and non-shared
+helpers belong in individual subclasses.
+"""
+
+from __future__ import absolute_import
+
+import codecs
+from collections import namedtuple
+import copy
+import getopt
+import logging
+import multiprocessing
+import os
+import Queue
+import signal
+import sys
+import textwrap
+import threading
+import traceback
+
+import boto
+from boto.storage_uri import StorageUri
+import gslib
+from gslib.cloud_api import AccessDeniedException
+from gslib.cloud_api import ArgumentException
+from gslib.cloud_api import ServiceException
+from gslib.cloud_api_delegator import CloudApiDelegator
+from gslib.cs_api_map import ApiSelector
+from gslib.cs_api_map import GsutilApiMapFactory
+from gslib.exception import CommandException
+from gslib.help_provider import HelpProvider
+from gslib.name_expansion import NameExpansionIterator
+from gslib.name_expansion import NameExpansionResult
+from gslib.parallelism_framework_util import AtomicIncrementDict
+from gslib.parallelism_framework_util import BasicIncrementDict
+from gslib.parallelism_framework_util import ThreadAndProcessSafeDict
+from gslib.plurality_checkable_iterator import PluralityCheckableIterator
+from gslib.sig_handling import RegisterSignalHandler
+from gslib.storage_url import StorageUrlFromString
+from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
+from gslib.translation_helper import AclTranslation
+from gslib.util import GetConfigFilePath
+from gslib.util import GsutilStreamHandler
+from gslib.util import HaveFileUrls
+from gslib.util import HaveProviderUrls
+from gslib.util import IS_WINDOWS
+from gslib.util import MultiprocessingIsAvailable
+from gslib.util import NO_MAX
+from gslib.util import UrlsAreForSingleProvider
+from gslib.util import UTF8
+from gslib.wildcard_iterator import CreateWildcardIterator
+
+OFFER_GSUTIL_M_SUGGESTION_THRESHOLD = 5
+
+if IS_WINDOWS:
+ import ctypes # pylint: disable=g-import-not-at-top
+
+
+def _DefaultExceptionHandler(cls, e):
+ cls.logger.exception(e)
+
+
+def CreateGsutilLogger(command_name):
+ """Creates a logger that resembles 'print' output.
+
+ This logger abides by gsutil -d/-D/-DD/-q options.
+
+ By default (if none of the above options is specified) the logger will display
+ all messages logged with level INFO or above. Log propagation is disabled.
+
+ Args:
+ command_name: Command name to create logger for.
+
+ Returns:
+ A logger object.
+ """
+ log = logging.getLogger(command_name)
+ log.propagate = False
+ log.setLevel(logging.root.level)
+ log_handler = GsutilStreamHandler()
+ log_handler.setFormatter(logging.Formatter('%(message)s'))
+ # Commands that call other commands (like mv) would cause log handlers to be
+ # added more than once, so avoid adding if one is already present.
+ if not log.handlers:
+ log.addHandler(log_handler)
+ return log
+
+
+def _UrlArgChecker(command_instance, url):
+ if not command_instance.exclude_symlinks:
+ return True
+ exp_src_url = url.expanded_storage_url
+ if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name):
+ command_instance.logger.info('Skipping symbolic link %s...', exp_src_url)
+ return False
+ return True
+
+
+def DummyArgChecker(*unused_args):
+ return True
+
+
+def SetAclFuncWrapper(cls, name_expansion_result, thread_state=None):
+ return cls.SetAclFunc(name_expansion_result, thread_state=thread_state)
+
+
+def SetAclExceptionHandler(cls, e):
+ """Exception handler that maintains state about post-completion status."""
+ cls.logger.error(str(e))
+ cls.everything_set_okay = False
+
+# We will keep this list of all thread- or process-safe queues ever created by
+# the main thread so that we can forcefully kill them upon shutdown. Otherwise,
+# we encounter a Python bug in which empty queues block forever on join (which
+# is called as part of the Python exit function cleanup) under the impression
+# that they are non-empty.
+# However, this also lets us shut down somewhat more cleanly when interrupted.
+queues = []
+
+
+def _NewMultiprocessingQueue():
+ queue = multiprocessing.Queue(MAX_QUEUE_SIZE)
+ queues.append(queue)
+ return queue
+
+
+def _NewThreadsafeQueue():
+ queue = Queue.Queue(MAX_QUEUE_SIZE)
+ queues.append(queue)
+ return queue
+
+# The maximum size of a process- or thread-safe queue. Imposing this limit
+# prevents us from needing to hold an arbitrary amount of data in memory.
+# However, setting this number too high (e.g., >= 32768 on OS X) can cause
+# problems on some operating systems.
+MAX_QUEUE_SIZE = 32500
+
+# That maximum depth of the tree of recursive calls to command.Apply. This is
+# an arbitrary limit put in place to prevent developers from accidentally
+# causing problems with infinite recursion, and it can be increased if needed.
+MAX_RECURSIVE_DEPTH = 5
+
+ZERO_TASKS_TO_DO_ARGUMENT = ('There were no', 'tasks to do')
+
+# Map from deprecated aliases to the current command and subcommands that
+# provide the same behavior.
+# TODO: Remove this map and deprecate old commands on 9/9/14.
+OLD_ALIAS_MAP = {'chacl': ['acl', 'ch'],
+ 'getacl': ['acl', 'get'],
+ 'setacl': ['acl', 'set'],
+ 'getcors': ['cors', 'get'],
+ 'setcors': ['cors', 'set'],
+ 'chdefacl': ['defacl', 'ch'],
+ 'getdefacl': ['defacl', 'get'],
+ 'setdefacl': ['defacl', 'set'],
+ 'disablelogging': ['logging', 'set', 'off'],
+ 'enablelogging': ['logging', 'set', 'on'],
+ 'getlogging': ['logging', 'get'],
+ 'getversioning': ['versioning', 'get'],
+ 'setversioning': ['versioning', 'set'],
+ 'getwebcfg': ['web', 'get'],
+ 'setwebcfg': ['web', 'set']}
+
+
+# Declare all of the module level variables - see
+# InitializeMultiprocessingVariables for an explanation of why this is
+# necessary.
+# pylint: disable=global-at-module-level
+global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter
+global total_tasks, call_completed_map, global_return_values_map
+global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed
+global current_max_recursive_level, shared_vars_map, shared_vars_list_map
+global class_map, worker_checking_level_lock, failure_count
+
+
+def InitializeMultiprocessingVariables():
+ """Initializes module-level variables that will be inherited by subprocesses.
+
+ On Windows, a multiprocessing.Manager object should only
+ be created within an "if __name__ == '__main__':" block. This function
+ must be called, otherwise every command that calls Command.Apply will fail.
+ """
+ # This list of global variables must exactly match the above list of
+ # declarations.
+ # pylint: disable=global-variable-undefined
+ global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter
+ global total_tasks, call_completed_map, global_return_values_map
+ global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed
+ global current_max_recursive_level, shared_vars_map, shared_vars_list_map
+ global class_map, worker_checking_level_lock, failure_count
+
+ manager = multiprocessing.Manager()
+
+ consumer_pools = []
+
+ # List of all existing task queues - used by all pools to find the queue
+ # that's appropriate for the given recursive_apply_level.
+ task_queues = []
+
+ # Used to assign a globally unique caller ID to each Apply call.
+ caller_id_lock = manager.Lock()
+ caller_id_counter = multiprocessing.Value('i', 0)
+
+ # Map from caller_id to total number of tasks to be completed for that ID.
+ total_tasks = ThreadAndProcessSafeDict(manager)
+
+ # Map from caller_id to a boolean which is True iff all its tasks are
+ # finished.
+ call_completed_map = ThreadAndProcessSafeDict(manager)
+
+ # Used to keep track of the set of return values for each caller ID.
+ global_return_values_map = AtomicIncrementDict(manager)
+
+ # Condition used to notify any waiting threads that a task has finished or
+ # that a call to Apply needs a new set of consumer processes.
+ need_pool_or_done_cond = manager.Condition()
+
+ # Lock used to prevent multiple worker processes from asking the main thread
+ # to create a new consumer pool for the same level.
+ worker_checking_level_lock = manager.Lock()
+
+ # Map from caller_id to the current number of completed tasks for that ID.
+ caller_id_finished_count = AtomicIncrementDict(manager)
+
+ # Used as a way for the main thread to distinguish between being woken up
+ # by another call finishing and being woken up by a call that needs a new set
+ # of consumer processes.
+ new_pool_needed = multiprocessing.Value('i', 0)
+
+ current_max_recursive_level = multiprocessing.Value('i', 0)
+
+ # Map from (caller_id, name) to the value of that shared variable.
+ shared_vars_map = AtomicIncrementDict(manager)
+ shared_vars_list_map = ThreadAndProcessSafeDict(manager)
+
+ # Map from caller_id to calling class.
+ class_map = manager.dict()
+
+ # Number of tasks that resulted in an exception in calls to Apply().
+ failure_count = multiprocessing.Value('i', 0)
+
+
+# Each subclass of Command must define a property named 'command_spec' that is
+# an instance of the following class.
+CommandSpec = namedtuple('CommandSpec', [
+ # Name of command.
+ 'command_name',
+ # Usage synopsis.
+ 'usage_synopsis',
+ # List of command name aliases.
+ 'command_name_aliases',
+ # Min number of args required by this command.
+ 'min_args',
+ # Max number of args required by this command, or NO_MAX.
+ 'max_args',
+ # Getopt-style string specifying acceptable sub args.
+ 'supported_sub_args',
+ # True if file URLs are acceptable for this command.
+ 'file_url_ok',
+ # True if provider-only URLs are acceptable for this command.
+ 'provider_url_ok',
+ # Index in args of first URL arg.
+ 'urls_start_arg',
+ # List of supported APIs
+ 'gs_api_support',
+ # Default API to use for this command
+ 'gs_default_api',
+ # Private arguments (for internal testing)
+ 'supported_private_args',
+ 'argparse_arguments',
+])
+
+
+class Command(HelpProvider):
+ """Base class for all gsutil commands."""
+
+ # Each subclass must override this with an instance of CommandSpec.
+ command_spec = None
+
+ _commands_with_subcommands_and_subopts = ['acl', 'defacl', 'logging', 'web',
+ 'notification']
+
+ # This keeps track of the recursive depth of the current call to Apply.
+ recursive_apply_level = 0
+
+ # If the multiprocessing module isn't available, we'll use this to keep track
+ # of the caller_id.
+ sequential_caller_id = -1
+
+ @staticmethod
+ def CreateCommandSpec(command_name, usage_synopsis=None,
+ command_name_aliases=None, min_args=0,
+ max_args=NO_MAX, supported_sub_args='',
+ file_url_ok=False, provider_url_ok=False,
+ urls_start_arg=0, gs_api_support=None,
+ gs_default_api=None, supported_private_args=None,
+ argparse_arguments=None):
+ """Creates an instance of CommandSpec, with defaults."""
+ return CommandSpec(
+ command_name=command_name,
+ usage_synopsis=usage_synopsis,
+ command_name_aliases=command_name_aliases or [],
+ min_args=min_args,
+ max_args=max_args,
+ supported_sub_args=supported_sub_args,
+ file_url_ok=file_url_ok,
+ provider_url_ok=provider_url_ok,
+ urls_start_arg=urls_start_arg,
+ gs_api_support=gs_api_support or [ApiSelector.XML],
+ gs_default_api=gs_default_api or ApiSelector.XML,
+ supported_private_args=supported_private_args,
+ argparse_arguments=argparse_arguments or [])
+
+ # Define a convenience property for command name, since it's used many places.
+ def _GetDefaultCommandName(self):
+ return self.command_spec.command_name
+ command_name = property(_GetDefaultCommandName)
+
+ def _CalculateUrlsStartArg(self):
+ """Calculate the index in args of the first URL arg.
+
+ Returns:
+ Index of the first URL arg (according to the command spec).
+ """
+ return self.command_spec.urls_start_arg
+
+ def _TranslateDeprecatedAliases(self, args):
+ """Map deprecated aliases to the corresponding new command, and warn."""
+ new_command_args = OLD_ALIAS_MAP.get(self.command_alias_used, None)
+ if new_command_args:
+ # Prepend any subcommands for the new command. The command name itself
+ # is not part of the args, so leave it out.
+ args = new_command_args[1:] + args
+ self.logger.warn('\n'.join(textwrap.wrap(
+ ('You are using a deprecated alias, "%(used_alias)s", for the '
+ '"%(command_name)s" command. This will stop working on 9/9/2014. '
+ 'Please use "%(command_name)s" with the appropriate sub-command in '
+ 'the future. See "gsutil help %(command_name)s" for details.') %
+ {'used_alias': self.command_alias_used,
+ 'command_name': self.command_name})))
+ return args
+
+ def __init__(self, command_runner, args, headers, debug, parallel_operations,
+ bucket_storage_uri_class, gsutil_api_class_map_factory,
+ test_method=None, logging_filters=None,
+ command_alias_used=None):
+ """Instantiates a Command.
+
+ Args:
+ command_runner: CommandRunner (for commands built atop other commands).
+ args: Command-line args (arg0 = actual arg, not command name ala bash).
+ headers: Dictionary containing optional HTTP headers to pass to boto.
+ debug: Debug level to pass in to boto connection (range 0..3).
+ parallel_operations: Should command operations be executed in parallel?
+ bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
+ Settable for testing/mocking.
+ gsutil_api_class_map_factory: Creates map of cloud storage interfaces.
+ Settable for testing/mocking.
+ test_method: Optional general purpose method for testing purposes.
+ Application and semantics of this method will vary by
+ command and test type.
+ logging_filters: Optional list of logging.Filters to apply to this
+ command's logger.
+ command_alias_used: The alias that was actually used when running this
+ command (as opposed to the "official" command name,
+ which will always correspond to the file name).
+
+ Implementation note: subclasses shouldn't need to define an __init__
+ method, and instead depend on the shared initialization that happens
+ here. If you do define an __init__ method in a subclass you'll need to
+ explicitly call super().__init__(). But you're encouraged not to do this,
+ because it will make changing the __init__ interface more painful.
+ """
+ # Save class values from constructor params.
+ self.command_runner = command_runner
+ self.unparsed_args = args
+ self.headers = headers
+ self.debug = debug
+ self.parallel_operations = parallel_operations
+ self.bucket_storage_uri_class = bucket_storage_uri_class
+ self.gsutil_api_class_map_factory = gsutil_api_class_map_factory
+ self.test_method = test_method
+ self.exclude_symlinks = False
+ self.recursion_requested = False
+ self.all_versions = False
+ self.command_alias_used = command_alias_used
+
+ # Global instance of a threaded logger object.
+ self.logger = CreateGsutilLogger(self.command_name)
+ if logging_filters:
+ for log_filter in logging_filters:
+ self.logger.addFilter(log_filter)
+
+ if self.command_spec is None:
+ raise CommandException('"%s" command implementation is missing a '
+ 'command_spec definition.' % self.command_name)
+
+ # Parse and validate args.
+ self.args = self._TranslateDeprecatedAliases(args)
+ self.ParseSubOpts()
+
+ # Named tuple public functions start with _
+ # pylint: disable=protected-access
+ self.command_spec = self.command_spec._replace(
+ urls_start_arg=self._CalculateUrlsStartArg())
+
+ if (len(self.args) < self.command_spec.min_args
+ or len(self.args) > self.command_spec.max_args):
+ self.RaiseWrongNumberOfArgumentsException()
+
+ if self.command_name not in self._commands_with_subcommands_and_subopts:
+ self.CheckArguments()
+
+ # Build the support and default maps from the command spec.
+ support_map = {
+ 'gs': self.command_spec.gs_api_support,
+ 's3': [ApiSelector.XML]
+ }
+ default_map = {
+ 'gs': self.command_spec.gs_default_api,
+ 's3': ApiSelector.XML
+ }
+ self.gsutil_api_map = GsutilApiMapFactory.GetApiMap(
+ self.gsutil_api_class_map_factory, support_map, default_map)
+
+ self.project_id = None
+ self.gsutil_api = CloudApiDelegator(
+ bucket_storage_uri_class, self.gsutil_api_map,
+ self.logger, debug=self.debug)
+
+ # Cross-platform path to run gsutil binary.
+ self.gsutil_cmd = ''
+ # If running on Windows, invoke python interpreter explicitly.
+ if gslib.util.IS_WINDOWS:
+ self.gsutil_cmd += 'python '
+ # Add full path to gsutil to make sure we test the correct version.
+ self.gsutil_path = gslib.GSUTIL_PATH
+ self.gsutil_cmd += self.gsutil_path
+
+ # We're treating recursion_requested like it's used by all commands, but
+ # only some of the commands accept the -R option.
+ if self.sub_opts:
+ for o, unused_a in self.sub_opts:
+ if o == '-r' or o == '-R':
+ self.recursion_requested = True
+ break
+
+ self.multiprocessing_is_available = MultiprocessingIsAvailable()[0]
+
+ def RaiseWrongNumberOfArgumentsException(self):
+ """Raises exception for wrong number of arguments supplied to command."""
+ if len(self.args) < self.command_spec.min_args:
+ tail_str = 's' if self.command_spec.min_args > 1 else ''
+ message = ('The %s command requires at least %d argument%s.' %
+ (self.command_name, self.command_spec.min_args, tail_str))
+ else:
+ message = ('The %s command accepts at most %d arguments.' %
+ (self.command_name, self.command_spec.max_args))
+ message += ' Usage:\n%s\nFor additional help run:\n gsutil help %s' % (
+ self.command_spec.usage_synopsis, self.command_name)
+ raise CommandException(message)
+
+ def RaiseInvalidArgumentException(self):
+ """Raises exception for specifying an invalid argument to command."""
+ message = ('Incorrect option(s) specified. Usage:\n%s\n'
+ 'For additional help run:\n gsutil help %s' % (
+ self.command_spec.usage_synopsis, self.command_name))
+ raise CommandException(message)
+
+ def ParseSubOpts(self, check_args=False):
+ """Parses sub-opt args.
+
+ Args:
+ check_args: True to have CheckArguments() called after parsing.
+
+ Populates:
+ (self.sub_opts, self.args) from parsing.
+
+ Raises: RaiseInvalidArgumentException if invalid args specified.
+ """
+ try:
+ self.sub_opts, self.args = getopt.getopt(
+ self.args, self.command_spec.supported_sub_args,
+ self.command_spec.supported_private_args or [])
+ except getopt.GetoptError:
+ self.RaiseInvalidArgumentException()
+ if check_args:
+ self.CheckArguments()
+
+ def CheckArguments(self):
+ """Checks that command line arguments match the command_spec.
+
+ Any commands in self._commands_with_subcommands_and_subopts are responsible
+ for calling this method after handling initial parsing of their arguments.
+ This prevents commands with sub-commands as well as options from breaking
+ the parsing of getopt.
+
+ TODO: Provide a function to parse commands and sub-commands more
+ intelligently once we stop allowing the deprecated command versions.
+
+ Raises:
+ CommandException if the arguments don't match.
+ """
+
+ if (not self.command_spec.file_url_ok
+ and HaveFileUrls(self.args[self.command_spec.urls_start_arg:])):
+ raise CommandException('"%s" command does not support "file://" URLs. '
+ 'Did you mean to use a gs:// URL?' %
+ self.command_name)
+ if (not self.command_spec.provider_url_ok
+ and HaveProviderUrls(self.args[self.command_spec.urls_start_arg:])):
+ raise CommandException('"%s" command does not support provider-only '
+ 'URLs.' % self.command_name)
+
+ def WildcardIterator(self, url_string, all_versions=False):
+ """Helper to instantiate gslib.WildcardIterator.
+
+ Args are same as gslib.WildcardIterator interface, but this method fills in
+ most of the values from instance state.
+
+ Args:
+ url_string: URL string naming wildcard objects to iterate.
+ all_versions: If true, the iterator yields all versions of objects
+ matching the wildcard. If false, yields just the live
+ object version.
+
+ Returns:
+ WildcardIterator for use by caller.
+ """
+ return CreateWildcardIterator(
+ url_string, self.gsutil_api, all_versions=all_versions,
+ debug=self.debug, project_id=self.project_id)
+
+ def RunCommand(self):
+ """Abstract function in base class. Subclasses must implement this.
+
+ The return value of this function will be used as the exit status of the
+ process, so subclass commands should return an integer exit code (0 for
+ success, a value in [1,255] for failure).
+ """
+ raise CommandException('Command %s is missing its RunCommand() '
+ 'implementation' % self.command_name)
+
+ ############################################################
+ # Shared helper functions that depend on base class state. #
+ ############################################################
+
+ def ApplyAclFunc(self, acl_func, acl_excep_handler, url_strs):
+ """Sets the standard or default object ACL depending on self.command_name.
+
+ Args:
+ acl_func: ACL function to be passed to Apply.
+ acl_excep_handler: ACL exception handler to be passed to Apply.
+ url_strs: URL strings on which to set ACL.
+
+ Raises:
+ CommandException if an ACL could not be set.
+ """
+ multi_threaded_url_args = []
+ # Handle bucket ACL setting operations single-threaded, because
+ # our threading machinery currently assumes it's working with objects
+ # (name_expansion_iterator), and normally we wouldn't expect users to need
+ # to set ACLs on huge numbers of buckets at once anyway.
+ for url_str in url_strs:
+ url = StorageUrlFromString(url_str)
+ if url.IsCloudUrl() and url.IsBucket():
+ if self.recursion_requested:
+ # If user specified -R option, convert any bucket args to bucket
+ # wildcards (e.g., gs://bucket/*), to prevent the operation from
+ # being applied to the buckets themselves.
+ url.object_name = '*'
+ multi_threaded_url_args.append(url.url_string)
+ else:
+ # Convert to a NameExpansionResult so we can re-use the threaded
+ # function for the single-threaded implementation. RefType is unused.
+ for blr in self.WildcardIterator(url.url_string).IterBuckets(
+ bucket_fields=['id']):
+ name_expansion_for_url = NameExpansionResult(
+ url, False, False, blr.storage_url)
+ acl_func(self, name_expansion_for_url)
+ else:
+ multi_threaded_url_args.append(url_str)
+
+ if len(multi_threaded_url_args) >= 1:
+ name_expansion_iterator = NameExpansionIterator(
+ self.command_name, self.debug,
+ self.logger, self.gsutil_api,
+ multi_threaded_url_args, self.recursion_requested,
+ all_versions=self.all_versions,
+ continue_on_error=self.continue_on_error or self.parallel_operations)
+
+ # Perform requests in parallel (-m) mode, if requested, using
+ # configured number of parallel processes and threads. Otherwise,
+ # perform requests with sequential function calls in current process.
+ self.Apply(acl_func, name_expansion_iterator, acl_excep_handler,
+ fail_on_error=not self.continue_on_error)
+
+ if not self.everything_set_okay and not self.continue_on_error:
+ raise CommandException('ACLs for some objects could not be set.')
+
+ def SetAclFunc(self, name_expansion_result, thread_state=None):
+ """Sets the object ACL for the name_expansion_result provided.
+
+ Args:
+ name_expansion_result: NameExpansionResult describing the target object.
+ thread_state: If present, use this gsutil Cloud API instance for the set.
+ """
+ if thread_state:
+ assert not self.def_acl
+ gsutil_api = thread_state
+ else:
+ gsutil_api = self.gsutil_api
+ op_string = 'default object ACL' if self.def_acl else 'ACL'
+ url = name_expansion_result.expanded_storage_url
+ self.logger.info('Setting %s on %s...', op_string, url)
+ if (gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML
+ and url.scheme != 'gs'):
+ # If we are called with a non-google ACL model, we need to use the XML
+ # passthrough. acl_arg should either be a canned ACL or an XML ACL.
+ self._SetAclXmlPassthrough(url, gsutil_api)
+ else:
+ # Normal Cloud API path. acl_arg is a JSON ACL or a canned ACL.
+ self._SetAclGsutilApi(url, gsutil_api)
+
+ def _SetAclXmlPassthrough(self, url, gsutil_api):
+ """Sets the ACL for the URL provided using the XML passthrough functions.
+
+ This function assumes that self.def_acl, self.canned,
+ and self.continue_on_error are initialized, and that self.acl_arg is
+ either an XML string or a canned ACL string.
+
+ Args:
+ url: CloudURL to set the ACL on.
+ gsutil_api: gsutil Cloud API to use for the ACL set. Must support XML
+ passthrough functions.
+ """
+ try:
+ orig_prefer_api = gsutil_api.prefer_api
+ gsutil_api.prefer_api = ApiSelector.XML
+ gsutil_api.XmlPassThroughSetAcl(
+ self.acl_arg, url, canned=self.canned,
+ def_obj_acl=self.def_acl, provider=url.scheme)
+ except ServiceException as e:
+ if self.continue_on_error:
+ self.everything_set_okay = False
+ self.logger.error(e)
+ else:
+ raise
+ finally:
+ gsutil_api.prefer_api = orig_prefer_api
+
+ def _SetAclGsutilApi(self, url, gsutil_api):
+ """Sets the ACL for the URL provided using the gsutil Cloud API.
+
+ This function assumes that self.def_acl, self.canned,
+ and self.continue_on_error are initialized, and that self.acl_arg is
+ either a JSON string or a canned ACL string.
+
+ Args:
+ url: CloudURL to set the ACL on.
+ gsutil_api: gsutil Cloud API to use for the ACL set.
+ """
+ try:
+ if url.IsBucket():
+ if self.def_acl:
+ if self.canned:
+ gsutil_api.PatchBucket(
+ url.bucket_name, apitools_messages.Bucket(),
+ canned_def_acl=self.acl_arg, provider=url.scheme, fields=['id'])
+ else:
+ def_obj_acl = AclTranslation.JsonToMessage(
+ self.acl_arg, apitools_messages.ObjectAccessControl)
+ bucket_metadata = apitools_messages.Bucket(
+ defaultObjectAcl=def_obj_acl)
+ gsutil_api.PatchBucket(url.bucket_name, bucket_metadata,
+ provider=url.scheme, fields=['id'])
+ else:
+ if self.canned:
+ gsutil_api.PatchBucket(
+ url.bucket_name, apitools_messages.Bucket(),
+ canned_acl=self.acl_arg, provider=url.scheme, fields=['id'])
+ else:
+ bucket_acl = AclTranslation.JsonToMessage(
+ self.acl_arg, apitools_messages.BucketAccessControl)
+ bucket_metadata = apitools_messages.Bucket(acl=bucket_acl)
+ gsutil_api.PatchBucket(url.bucket_name, bucket_metadata,
+ provider=url.scheme, fields=['id'])
+ else: # url.IsObject()
+ if self.canned:
+ gsutil_api.PatchObjectMetadata(
+ url.bucket_name, url.object_name, apitools_messages.Object(),
+ provider=url.scheme, generation=url.generation,
+ canned_acl=self.acl_arg)
+ else:
+ object_acl = AclTranslation.JsonToMessage(
+ self.acl_arg, apitools_messages.ObjectAccessControl)
+ object_metadata = apitools_messages.Object(acl=object_acl)
+ gsutil_api.PatchObjectMetadata(url.bucket_name, url.object_name,
+ object_metadata, provider=url.scheme,
+ generation=url.generation)
+ except ArgumentException, e:
+ raise
+ except ServiceException, e:
+ if self.continue_on_error:
+ self.everything_set_okay = False
+ self.logger.error(e)
+ else:
+ raise
+
+ def SetAclCommandHelper(self, acl_func, acl_excep_handler):
+ """Sets ACLs on the self.args using the passed-in acl function.
+
+ Args:
+ acl_func: ACL function to be passed to Apply.
+ acl_excep_handler: ACL exception handler to be passed to Apply.
+ """
+ acl_arg = self.args[0]
+ url_args = self.args[1:]
+ # Disallow multi-provider setacl requests, because there are differences in
+ # the ACL models.
+ if not UrlsAreForSingleProvider(url_args):
+ raise CommandException('"%s" command spanning providers not allowed.' %
+ self.command_name)
+
+ # Determine whether acl_arg names a file containing XML ACL text vs. the
+ # string name of a canned ACL.
+ if os.path.isfile(acl_arg):
+ with codecs.open(acl_arg, 'r', UTF8) as f:
+ acl_arg = f.read()
+ self.canned = False
+ else:
+ # No file exists, so expect a canned ACL string.
+ # Canned ACLs are not supported in JSON and we need to use the XML API
+ # to set them.
+ # validate=False because we allow wildcard urls.
+ storage_uri = boto.storage_uri(
+ url_args[0], debug=self.debug, validate=False,
+ bucket_storage_uri_class=self.bucket_storage_uri_class)
+
+ canned_acls = storage_uri.canned_acls()
+ if acl_arg not in canned_acls:
+ raise CommandException('Invalid canned ACL "%s".' % acl_arg)
+ self.canned = True
+
+ # Used to track if any ACLs failed to be set.
+ self.everything_set_okay = True
+ self.acl_arg = acl_arg
+
+ self.ApplyAclFunc(acl_func, acl_excep_handler, url_args)
+ if not self.everything_set_okay and not self.continue_on_error:
+ raise CommandException('ACLs for some objects could not be set.')
+
+ def _WarnServiceAccounts(self):
+ """Warns service account users who have received an AccessDenied error.
+
+ When one of the metadata-related commands fails due to AccessDenied, user
+ must ensure that they are listed as an Owner in the API console.
+ """
+ # Import this here so that the value will be set first in
+ # gcs_oauth2_boto_plugin.
+ # pylint: disable=g-import-not-at-top
+ from gcs_oauth2_boto_plugin.oauth2_plugin import IS_SERVICE_ACCOUNT
+
+ if IS_SERVICE_ACCOUNT:
+ # This method is only called when canned ACLs are used, so the warning
+ # definitely applies.
+ self.logger.warning('\n'.join(textwrap.wrap(
+ 'It appears that your service account has been denied access while '
+ 'attempting to perform a metadata operation. If you believe that you '
+ 'should have access to this metadata (i.e., if it is associated with '
+ 'your account), please make sure that your service account''s email '
+ 'address is listed as an Owner in the Team tab of the API console. '
+ 'See "gsutil help creds" for further information.\n')))
+
+ def GetAndPrintAcl(self, url_str):
+ """Prints the standard or default object ACL depending on self.command_name.
+
+ Args:
+ url_str: URL string to get ACL for.
+ """
+ blr = self.GetAclCommandBucketListingReference(url_str)
+ url = StorageUrlFromString(url_str)
+ if (self.gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML
+ and url.scheme != 'gs'):
+ # Need to use XML passthrough.
+ try:
+ acl = self.gsutil_api.XmlPassThroughGetAcl(
+ url, def_obj_acl=self.def_acl, provider=url.scheme)
+ print acl.to_xml()
+ except AccessDeniedException, _:
+ self._WarnServiceAccounts()
+ raise
+ else:
+ if self.command_name == 'defacl':
+ acl = blr.root_object.defaultObjectAcl
+ if not acl:
+ self.logger.warn(
+ 'No default object ACL present for %s. This could occur if '
+ 'the default object ACL is private, in which case objects '
+ 'created in this bucket will be readable only by their '
+ 'creators. It could also mean you do not have OWNER permission '
+ 'on %s and therefore do not have permission to read the '
+ 'default object ACL.', url_str, url_str)
+ else:
+ acl = blr.root_object.acl
+ if not acl:
+ self._WarnServiceAccounts()
+ raise AccessDeniedException('Access denied. Please ensure you have '
+ 'OWNER permission on %s.' % url_str)
+ print AclTranslation.JsonFromMessage(acl)
+
+ def GetAclCommandBucketListingReference(self, url_str):
+ """Gets a single bucket listing reference for an acl get command.
+
+ Args:
+ url_str: URL string to get the bucket listing reference for.
+
+ Returns:
+ BucketListingReference for the URL string.
+
+ Raises:
+ CommandException if string did not result in exactly one reference.
+ """
+ # We're guaranteed by caller that we have the appropriate type of url
+ # string for the call (ex. we will never be called with an object string
+ # by getdefacl)
+ wildcard_url = StorageUrlFromString(url_str)
+ if wildcard_url.IsObject():
+ plurality_iter = PluralityCheckableIterator(
+ self.WildcardIterator(url_str).IterObjects(
+ bucket_listing_fields=['acl']))
+ else:
+ # Bucket or provider. We call IterBuckets explicitly here to ensure that
+ # the root object is populated with the acl.
+ if self.command_name == 'defacl':
+ bucket_fields = ['defaultObjectAcl']
+ else:
+ bucket_fields = ['acl']
+ plurality_iter = PluralityCheckableIterator(
+ self.WildcardIterator(url_str).IterBuckets(
+ bucket_fields=bucket_fields))
+ if plurality_iter.IsEmpty():
+ raise CommandException('No URLs matched')
+ if plurality_iter.HasPlurality():
+ raise CommandException(
+ '%s matched more than one URL, which is not allowed by the %s '
+ 'command' % (url_str, self.command_name))
+ return list(plurality_iter)[0]
+
+ def _HandleMultiProcessingSigs(self, unused_signal_num,
+ unused_cur_stack_frame):
+ """Handles signals INT AND TERM during a multi-process/multi-thread request.
+
+ Kills subprocesses.
+
+ Args:
+ unused_signal_num: signal generated by ^C.
+ unused_cur_stack_frame: Current stack frame.
+ """
+ # Note: This only works under Linux/MacOS. See
+ # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details
+ # about why making it work correctly across OS's is harder and still open.
+ ShutDownGsutil()
+ sys.stderr.write('Caught ^C - exiting\n')
+ # Simply calling sys.exit(1) doesn't work - see above bug for details.
+ KillProcess(os.getpid())
+
+ def GetSingleBucketUrlFromArg(self, arg, bucket_fields=None):
+ """Gets a single bucket URL based on the command arguments.
+
+ Args:
+ arg: String argument to get bucket URL for.
+ bucket_fields: Fields to populate for the bucket.
+
+ Returns:
+ (StorageUrl referring to a single bucket, Bucket metadata).
+
+ Raises:
+ CommandException if args did not match exactly one bucket.
+ """
+ plurality_checkable_iterator = self.GetBucketUrlIterFromArg(
+ arg, bucket_fields=bucket_fields)
+ if plurality_checkable_iterator.HasPlurality():
+ raise CommandException(
+ '%s matched more than one URL, which is not\n'
+ 'allowed by the %s command' % (arg, self.command_name))
+ blr = list(plurality_checkable_iterator)[0]
+ return StorageUrlFromString(blr.url_string), blr.root_object
+
+ def GetBucketUrlIterFromArg(self, arg, bucket_fields=None):
+ """Gets a single bucket URL based on the command arguments.
+
+ Args:
+ arg: String argument to iterate over.
+ bucket_fields: Fields to populate for the bucket.
+
+ Returns:
+ PluralityCheckableIterator over buckets.
+
+ Raises:
+ CommandException if iterator matched no buckets.
+ """
+ arg_url = StorageUrlFromString(arg)
+ if not arg_url.IsCloudUrl() or arg_url.IsObject():
+ raise CommandException('"%s" command must specify a bucket' %
+ self.command_name)
+
+ plurality_checkable_iterator = PluralityCheckableIterator(
+ self.WildcardIterator(arg).IterBuckets(
+ bucket_fields=bucket_fields))
+ if plurality_checkable_iterator.IsEmpty():
+ raise CommandException('No URLs matched')
+ return plurality_checkable_iterator
+
+ ######################
+ # Private functions. #
+ ######################
+
+ def _ResetConnectionPool(self):
+ # Each OS process needs to establish its own set of connections to
+ # the server to avoid writes from different OS processes interleaving
+ # onto the same socket (and garbling the underlying SSL session).
+ # We ensure each process gets its own set of connections here by
+ # closing all connections in the storage provider connection pool.
+ connection_pool = StorageUri.provider_pool
+ if connection_pool:
+ for i in connection_pool:
+ connection_pool[i].connection.close()
+
+ def _GetProcessAndThreadCount(self, process_count, thread_count,
+ parallel_operations_override):
+ """Determines the values of process_count and thread_count.
+
+ These values are used for parallel operations.
+ If we're not performing operations in parallel, then ignore
+ existing values and use process_count = thread_count = 1.
+
+ Args:
+ process_count: A positive integer or None. In the latter case, we read
+ the value from the .boto config file.
+ thread_count: A positive integer or None. In the latter case, we read
+ the value from the .boto config file.
+ parallel_operations_override: Used to override self.parallel_operations.
+ This allows the caller to safely override
+ the top-level flag for a single call.
+
+ Returns:
+ (process_count, thread_count): The number of processes and threads to use,
+ respectively.
+ """
+ # Set OS process and python thread count as a function of options
+ # and config.
+ if self.parallel_operations or parallel_operations_override:
+ if not process_count:
+ process_count = boto.config.getint(
+ 'GSUtil', 'parallel_process_count',
+ gslib.commands.config.DEFAULT_PARALLEL_PROCESS_COUNT)
+ if process_count < 1:
+ raise CommandException('Invalid parallel_process_count "%d".' %
+ process_count)
+ if not thread_count:
+ thread_count = boto.config.getint(
+ 'GSUtil', 'parallel_thread_count',
+ gslib.commands.config.DEFAULT_PARALLEL_THREAD_COUNT)
+ if thread_count < 1:
+ raise CommandException('Invalid parallel_thread_count "%d".' %
+ thread_count)
+ else:
+ # If -m not specified, then assume 1 OS process and 1 Python thread.
+ process_count = 1
+ thread_count = 1
+
+ if IS_WINDOWS and process_count > 1:
+ raise CommandException('\n'.join(textwrap.wrap(
+ ('It is not possible to set process_count > 1 on Windows. Please '
+ 'update your config file (located at %s) and set '
+ '"parallel_process_count = 1".') %
+ GetConfigFilePath())))
+ self.logger.debug('process count: %d', process_count)
+ self.logger.debug('thread count: %d', thread_count)
+
+ return (process_count, thread_count)
+
+ def _SetUpPerCallerState(self):
+ """Set up the state for a caller id, corresponding to one Apply call."""
+ # Get a new caller ID.
+ with caller_id_lock:
+ caller_id_counter.value += 1
+ caller_id = caller_id_counter.value
+
+ # Create a copy of self with an incremented recursive level. This allows
+ # the class to report its level correctly if the function called from it
+ # also needs to call Apply.
+ cls = copy.copy(self)
+ cls.recursive_apply_level += 1
+
+ # Thread-safe loggers can't be pickled, so we will remove it here and
+ # recreate it later in the WorkerThread. This is not a problem since any
+ # logger with the same name will be treated as a singleton.
+ cls.logger = None
+
+ # Likewise, the default API connection can't be pickled, but it is unused
+ # anyway as each thread gets its own API delegator.
+ cls.gsutil_api = None
+
+ class_map[caller_id] = cls
+ total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet.
+ call_completed_map[caller_id] = False
+ caller_id_finished_count.Put(caller_id, 0)
+ global_return_values_map.Put(caller_id, [])
+ return caller_id
+
+ def _CreateNewConsumerPool(self, num_processes, num_threads):
+ """Create a new pool of processes that call _ApplyThreads."""
+ processes = []
+ task_queue = _NewMultiprocessingQueue()
+ task_queues.append(task_queue)
+
+ current_max_recursive_level.value += 1
+ if current_max_recursive_level.value > MAX_RECURSIVE_DEPTH:
+ raise CommandException('Recursion depth of Apply calls is too great.')
+ for _ in range(num_processes):
+ recursive_apply_level = len(consumer_pools)
+ p = multiprocessing.Process(
+ target=self._ApplyThreads,
+ args=(num_threads, num_processes, recursive_apply_level))
+ p.daemon = True
+ processes.append(p)
+ p.start()
+ consumer_pool = _ConsumerPool(processes, task_queue)
+ consumer_pools.append(consumer_pool)
+
+ def Apply(self, func, args_iterator, exception_handler,
+ shared_attrs=None, arg_checker=_UrlArgChecker,
+ parallel_operations_override=False, process_count=None,
+ thread_count=None, should_return_results=False,
+ fail_on_error=False):
+ """Calls _Parallel/SequentialApply based on multiprocessing availability.
+
+ Args:
+ func: Function to call to process each argument.
+ args_iterator: Iterable collection of arguments to be put into the
+ work queue.
+ exception_handler: Exception handler for WorkerThread class.
+ shared_attrs: List of attributes to manage across sub-processes.
+ arg_checker: Used to determine whether we should process the current
+ argument or simply skip it. Also handles any logging that
+ is specific to a particular type of argument.
+ parallel_operations_override: Used to override self.parallel_operations.
+ This allows the caller to safely override
+ the top-level flag for a single call.
+ process_count: The number of processes to use. If not specified, then
+ the configured default will be used.
+ thread_count: The number of threads per process. If not speficied, then
+ the configured default will be used..
+ should_return_results: If true, then return the results of all successful
+ calls to func in a list.
+ fail_on_error: If true, then raise any exceptions encountered when
+ executing func. This is only applicable in the case of
+ process_count == thread_count == 1.
+
+ Returns:
+ Results from spawned threads.
+ """
+ if shared_attrs:
+ original_shared_vars_values = {} # We'll add these back in at the end.
+ for name in shared_attrs:
+ original_shared_vars_values[name] = getattr(self, name)
+ # By setting this to 0, we simplify the logic for computing deltas.
+ # We'll add it back after all of the tasks have been performed.
+ setattr(self, name, 0)
+
+ (process_count, thread_count) = self._GetProcessAndThreadCount(
+ process_count, thread_count, parallel_operations_override)
+
+ is_main_thread = (self.recursive_apply_level == 0
+ and self.sequential_caller_id == -1)
+
+ # We don't honor the fail_on_error flag in the case of multiple threads
+ # or processes.
+ fail_on_error = fail_on_error and (process_count * thread_count == 1)
+
+ # Only check this from the first call in the main thread. Apart from the
+ # fact that it's wasteful to try this multiple times in general, it also
+ # will never work when called from a subprocess since we use daemon
+ # processes, and daemons can't create other processes.
+ if is_main_thread:
+ if ((not self.multiprocessing_is_available)
+ and thread_count * process_count > 1):
+ # Run the check again and log the appropriate warnings. This was run
+ # before, when the Command object was created, in order to calculate
+ # self.multiprocessing_is_available, but we don't want to print the
+ # warning until we're sure the user actually tried to use multiple
+ # threads or processes.
+ MultiprocessingIsAvailable(logger=self.logger)
+
+ if self.multiprocessing_is_available:
+ caller_id = self._SetUpPerCallerState()
+ else:
+ self.sequential_caller_id += 1
+ caller_id = self.sequential_caller_id
+
+ if is_main_thread:
+ # pylint: disable=global-variable-undefined
+ global global_return_values_map, shared_vars_map, failure_count
+ global caller_id_finished_count, shared_vars_list_map
+ global_return_values_map = BasicIncrementDict()
+ global_return_values_map.Put(caller_id, [])
+ shared_vars_map = BasicIncrementDict()
+ caller_id_finished_count = BasicIncrementDict()
+ shared_vars_list_map = {}
+ failure_count = 0
+
+ # If any shared attributes passed by caller, create a dictionary of
+ # shared memory variables for every element in the list of shared
+ # attributes.
+ if shared_attrs:
+ shared_vars_list_map[caller_id] = shared_attrs
+ for name in shared_attrs:
+ shared_vars_map.Put((caller_id, name), 0)
+
+ # Make all of the requested function calls.
+ if self.multiprocessing_is_available and thread_count * process_count > 1:
+ self._ParallelApply(func, args_iterator, exception_handler, caller_id,
+ arg_checker, process_count, thread_count,
+ should_return_results, fail_on_error)
+ else:
+ self._SequentialApply(func, args_iterator, exception_handler, caller_id,
+ arg_checker, should_return_results, fail_on_error)
+
+ if shared_attrs:
+ for name in shared_attrs:
+ # This allows us to retain the original value of the shared variable,
+ # and simply apply the delta after what was done during the call to
+ # apply.
+ final_value = (original_shared_vars_values[name] +
+ shared_vars_map.Get((caller_id, name)))
+ setattr(self, name, final_value)
+
+ if should_return_results:
+ return global_return_values_map.Get(caller_id)
+
+ def _MaybeSuggestGsutilDashM(self):
+ """Outputs a sugestion to the user to use gsutil -m."""
+ if not (boto.config.getint('GSUtil', 'parallel_process_count', 0) == 1 and
+ boto.config.getint('GSUtil', 'parallel_thread_count', 0) == 1):
+ self.logger.info('\n' + textwrap.fill(
+ '==> NOTE: You are performing a sequence of gsutil operations that '
+ 'may run significantly faster if you instead use gsutil -m %s ...\n'
+ 'Please see the -m section under "gsutil help options" for further '
+ 'information about when gsutil -m can be advantageous.'
+ % sys.argv[1]) + '\n')
+
+ # pylint: disable=g-doc-args
+ def _SequentialApply(self, func, args_iterator, exception_handler, caller_id,
+ arg_checker, should_return_results, fail_on_error):
+ """Performs all function calls sequentially in the current thread.
+
+ No other threads or processes will be spawned. This degraded functionality
+ is used when the multiprocessing module is not available or the user
+ requests only one thread and one process.
+ """
+ # Create a WorkerThread to handle all of the logic needed to actually call
+ # the function. Note that this thread will never be started, and all work
+ # is done in the current thread.
+ worker_thread = WorkerThread(None, False)
+ args_iterator = iter(args_iterator)
+ # Count of sequential calls that have been made. Used for producing
+ # suggestion to use gsutil -m.
+ sequential_call_count = 0
+ while True:
+
+ # Try to get the next argument, handling any exceptions that arise.
+ try:
+ args = args_iterator.next()
+ except StopIteration, e:
+ break
+ except Exception, e: # pylint: disable=broad-except
+ _IncrementFailureCount()
+ if fail_on_error:
+ raise
+ else:
+ try:
+ exception_handler(self, e)
+ except Exception, _: # pylint: disable=broad-except
+ self.logger.debug(
+ 'Caught exception while handling exception for %s:\n%s',
+ func, traceback.format_exc())
+ continue
+
+ sequential_call_count += 1
+ if sequential_call_count == OFFER_GSUTIL_M_SUGGESTION_THRESHOLD:
+ # Output suggestion near beginning of run, so user sees it early and can
+ # ^C and try gsutil -m.
+ self._MaybeSuggestGsutilDashM()
+ if arg_checker(self, args):
+ # Now that we actually have the next argument, perform the task.
+ task = Task(func, args, caller_id, exception_handler,
+ should_return_results, arg_checker, fail_on_error)
+ worker_thread.PerformTask(task, self)
+ if sequential_call_count >= gslib.util.GetTermLines():
+ # Output suggestion at end of long run, in case user missed it at the
+ # start and it scrolled off-screen.
+ self._MaybeSuggestGsutilDashM()
+
+ # pylint: disable=g-doc-args
+ def _ParallelApply(self, func, args_iterator, exception_handler, caller_id,
+ arg_checker, process_count, thread_count,
+ should_return_results, fail_on_error):
+ """Dispatches input arguments across a thread/process pool.
+
+ Pools are composed of parallel OS processes and/or Python threads,
+ based on options (-m or not) and settings in the user's config file.
+
+ If only one OS process is requested/available, dispatch requests across
+ threads in the current OS process.
+
+ In the multi-process case, we will create one pool of worker processes for
+ each level of the tree of recursive calls to Apply. E.g., if A calls
+ Apply(B), and B ultimately calls Apply(C) followed by Apply(D), then we
+ will only create two sets of worker processes - B will execute in the first,
+ and C and D will execute in the second. If C is then changed to call
+ Apply(E) and D is changed to call Apply(F), then we will automatically
+ create a third set of processes (lazily, when needed) that will be used to
+ execute calls to E and F. This might look something like:
+
+ Pool1 Executes: B
+ / \
+ Pool2 Executes: C D
+ / \
+ Pool3 Executes: E F
+
+ Apply's parallelism is generally broken up into 4 cases:
+ - If process_count == thread_count == 1, then all tasks will be executed
+ by _SequentialApply.
+ - If process_count > 1 and thread_count == 1, then the main thread will
+ create a new pool of processes (if they don't already exist) and each of
+ those processes will execute the tasks in a single thread.
+ - If process_count == 1 and thread_count > 1, then this process will create
+ a new pool of threads to execute the tasks.
+ - If process_count > 1 and thread_count > 1, then the main thread will
+ create a new pool of processes (if they don't already exist) and each of
+ those processes will, upon creation, create a pool of threads to
+ execute the tasks.
+
+ Args:
+ caller_id: The caller ID unique to this call to command.Apply.
+ See command.Apply for description of other arguments.
+ """
+ is_main_thread = self.recursive_apply_level == 0
+
+ # Catch SIGINT and SIGTERM under Linux/MacOs so we can do cleanup before
+ # exiting.
+ if not IS_WINDOWS and is_main_thread:
+ # Register as a final signal handler because this handler kills the
+ # main gsutil process (so it must run last).
+ RegisterSignalHandler(signal.SIGINT, self._HandleMultiProcessingSigs,
+ is_final_handler=True)
+ RegisterSignalHandler(signal.SIGTERM, self._HandleMultiProcessingSigs,
+ is_final_handler=True)
+
+ if not task_queues:
+ # The process we create will need to access the next recursive level
+ # of task queues if it makes a call to Apply, so we always keep around
+ # one more queue than we know we need. OTOH, if we don't create a new
+ # process, the existing process still needs a task queue to use.
+ task_queues.append(_NewMultiprocessingQueue())
+
+ if process_count > 1: # Handle process pool creation.
+ # Check whether this call will need a new set of workers.
+
+ # Each worker must acquire a shared lock before notifying the main thread
+ # that it needs a new worker pool, so that at most one worker asks for
+ # a new worker pool at once.
+ try:
+ if not is_main_thread:
+ worker_checking_level_lock.acquire()
+ if self.recursive_apply_level >= current_max_recursive_level.value:
+ with need_pool_or_done_cond:
+ # Only the main thread is allowed to create new processes -
+ # otherwise, we will run into some Python bugs.
+ if is_main_thread:
+ self._CreateNewConsumerPool(process_count, thread_count)
+ else:
+ # Notify the main thread that we need a new consumer pool.
+ new_pool_needed.value = 1
+ need_pool_or_done_cond.notify_all()
+ # The main thread will notify us when it finishes.
+ need_pool_or_done_cond.wait()
+ finally:
+ if not is_main_thread:
+ worker_checking_level_lock.release()
+
+ # If we're running in this process, create a separate task queue. Otherwise,
+ # if Apply has already been called with process_count > 1, then there will
+ # be consumer pools trying to use our processes.
+ if process_count > 1:
+ task_queue = task_queues[self.recursive_apply_level]
+ else:
+ task_queue = _NewMultiprocessingQueue()
+
+ # Kick off a producer thread to throw tasks in the global task queue. We
+ # do this asynchronously so that the main thread can be free to create new
+ # consumer pools when needed (otherwise, any thread with a task that needs
+ # a new consumer pool must block until we're completely done producing; in
+ # the worst case, every worker blocks on such a call and the producer fills
+ # up the task queue before it finishes, so we block forever).
+ producer_thread = ProducerThread(copy.copy(self), args_iterator, caller_id,
+ func, task_queue, should_return_results,
+ exception_handler, arg_checker,
+ fail_on_error)
+
+ if process_count > 1:
+ # Wait here until either:
+ # 1. We're the main thread and someone needs a new consumer pool - in
+ # which case we create one and continue waiting.
+ # 2. Someone notifies us that all of the work we requested is done, in
+ # which case we retrieve the results (if applicable) and stop
+ # waiting.
+ while True:
+ with need_pool_or_done_cond:
+ # Either our call is done, or someone needs a new level of consumer
+ # pools, or we the wakeup call was meant for someone else. It's
+ # impossible for both conditions to be true, since the main thread is
+ # blocked on any other ongoing calls to Apply, and a thread would not
+ # ask for a new consumer pool unless it had more work to do.
+ if call_completed_map[caller_id]:
+ break
+ elif is_main_thread and new_pool_needed.value:
+ new_pool_needed.value = 0
+ self._CreateNewConsumerPool(process_count, thread_count)
+ need_pool_or_done_cond.notify_all()
+
+ # Note that we must check the above conditions before the wait() call;
+ # otherwise, the notification can happen before we start waiting, in
+ # which case we'll block forever.
+ need_pool_or_done_cond.wait()
+ else: # Using a single process.
+ self._ApplyThreads(thread_count, process_count,
+ self.recursive_apply_level,
+ is_blocking_call=True, task_queue=task_queue)
+
+ # We encountered an exception from the producer thread before any arguments
+ # were enqueued, but it wouldn't have been propagated, so we'll now
+ # explicitly raise it here.
+ if producer_thread.unknown_exception:
+ # pylint: disable=raising-bad-type
+ raise producer_thread.unknown_exception
+
+ # We encountered an exception from the producer thread while iterating over
+ # the arguments, so raise it here if we're meant to fail on error.
+ if producer_thread.iterator_exception and fail_on_error:
+ # pylint: disable=raising-bad-type
+ raise producer_thread.iterator_exception
+
+ def _ApplyThreads(self, thread_count, process_count, recursive_apply_level,
+ is_blocking_call=False, task_queue=None):
+ """Assigns the work from the multi-process global task queue.
+
+ Work is assigned to an individual process for later consumption either by
+ the WorkerThreads or (if thread_count == 1) this thread.
+
+ Args:
+ thread_count: The number of threads used to perform the work. If 1, then
+ perform all work in this thread.
+ process_count: The number of processes used to perform the work.
+ recursive_apply_level: The depth in the tree of recursive calls to Apply
+ of this thread.
+ is_blocking_call: True iff the call to Apply is blocked on this call
+ (which is true iff process_count == 1), implying that
+ _ApplyThreads must behave as a blocking call.
+ """
+ self._ResetConnectionPool()
+ self.recursive_apply_level = recursive_apply_level
+
+ task_queue = task_queue or task_queues[recursive_apply_level]
+
+ assert thread_count * process_count > 1, (
+ 'Invalid state, calling command._ApplyThreads with only one thread '
+ 'and process.')
+ worker_pool = WorkerPool(
+ thread_count, self.logger,
+ bucket_storage_uri_class=self.bucket_storage_uri_class,
+ gsutil_api_map=self.gsutil_api_map, debug=self.debug)
+
+ num_enqueued = 0
+ while True:
+ task = task_queue.get()
+ if task.args != ZERO_TASKS_TO_DO_ARGUMENT:
+ # If we have no tasks to do and we're performing a blocking call, we
+ # need a special signal to tell us to stop - otherwise, we block on
+ # the call to task_queue.get() forever.
+ worker_pool.AddTask(task)
+ num_enqueued += 1
+
+ if is_blocking_call:
+ num_to_do = total_tasks[task.caller_id]
+ # The producer thread won't enqueue the last task until after it has
+ # updated total_tasks[caller_id], so we know that num_to_do < 0 implies
+ # we will do this check again.
+ if num_to_do >= 0 and num_enqueued == num_to_do:
+ if thread_count == 1:
+ return
+ else:
+ while True:
+ with need_pool_or_done_cond:
+ if call_completed_map[task.caller_id]:
+ # We need to check this first, in case the condition was
+ # notified before we grabbed the lock.
+ return
+ need_pool_or_done_cond.wait()
+
+
+# Below here lie classes and functions related to controlling the flow of tasks
+# between various threads and processes.
+
+
+class _ConsumerPool(object):
+
+ def __init__(self, processes, task_queue):
+ self.processes = processes
+ self.task_queue = task_queue
+
+ def ShutDown(self):
+ for process in self.processes:
+ KillProcess(process.pid)
+
+
+def KillProcess(pid):
+ """Make best effort to kill the given process.
+
+ We ignore all exceptions so a caller looping through a list of processes will
+ continue attempting to kill each, even if one encounters a problem.
+
+ Args:
+ pid: The process ID.
+ """
+ try:
+ # os.kill doesn't work in 2.X or 3.Y on Windows for any X < 7 or Y < 2.
+ if IS_WINDOWS and ((2, 6) <= sys.version_info[:3] < (2, 7) or
+ (3, 0) <= sys.version_info[:3] < (3, 2)):
+ kernel32 = ctypes.windll.kernel32
+ handle = kernel32.OpenProcess(1, 0, pid)
+ kernel32.TerminateProcess(handle, 0)
+ else:
+ os.kill(pid, signal.SIGKILL)
+ except: # pylint: disable=bare-except
+ pass
+
+
+class Task(namedtuple('Task', (
+ 'func args caller_id exception_handler should_return_results arg_checker '
+ 'fail_on_error'))):
+ """Task class representing work to be completed.
+
+ Args:
+ func: The function to be executed.
+ args: The arguments to func.
+ caller_id: The globally-unique caller ID corresponding to the Apply call.
+ exception_handler: The exception handler to use if the call to func fails.
+ should_return_results: True iff the results of this function should be
+ returned from the Apply call.
+ arg_checker: Used to determine whether we should process the current
+ argument or simply skip it. Also handles any logging that
+ is specific to a particular type of argument.
+ fail_on_error: If true, then raise any exceptions encountered when
+ executing func. This is only applicable in the case of
+ process_count == thread_count == 1.
+ """
+ pass
+
+
+class ProducerThread(threading.Thread):
+ """Thread used to enqueue work for other processes and threads."""
+
+ def __init__(self, cls, args_iterator, caller_id, func, task_queue,
+ should_return_results, exception_handler, arg_checker,
+ fail_on_error):
+ """Initializes the producer thread.
+
+ Args:
+ cls: Instance of Command for which this ProducerThread was created.
+ args_iterator: Iterable collection of arguments to be put into the
+ work queue.
+ caller_id: Globally-unique caller ID corresponding to this call to Apply.
+ func: The function to be called on each element of args_iterator.
+ task_queue: The queue into which tasks will be put, to later be consumed
+ by Command._ApplyThreads.
+ should_return_results: True iff the results for this call to command.Apply
+ were requested.
+ exception_handler: The exception handler to use when errors are
+ encountered during calls to func.
+ arg_checker: Used to determine whether we should process the current
+ argument or simply skip it. Also handles any logging that
+ is specific to a particular type of argument.
+ fail_on_error: If true, then raise any exceptions encountered when
+ executing func. This is only applicable in the case of
+ process_count == thread_count == 1.
+ """
+ super(ProducerThread, self).__init__()
+ self.func = func
+ self.cls = cls
+ self.args_iterator = args_iterator
+ self.caller_id = caller_id
+ self.task_queue = task_queue
+ self.arg_checker = arg_checker
+ self.exception_handler = exception_handler
+ self.should_return_results = should_return_results
+ self.fail_on_error = fail_on_error
+ self.shared_variables_updater = _SharedVariablesUpdater()
+ self.daemon = True
+ self.unknown_exception = None
+ self.iterator_exception = None
+ self.start()
+
+ def run(self):
+ num_tasks = 0
+ cur_task = None
+ last_task = None
+ try:
+ args_iterator = iter(self.args_iterator)
+ while True:
+ try:
+ args = args_iterator.next()
+ except StopIteration, e:
+ break
+ except Exception, e: # pylint: disable=broad-except
+ _IncrementFailureCount()
+ if self.fail_on_error:
+ self.iterator_exception = e
+ raise
+ else:
+ try:
+ self.exception_handler(self.cls, e)
+ except Exception, _: # pylint: disable=broad-except
+ self.cls.logger.debug(
+ 'Caught exception while handling exception for %s:\n%s',
+ self.func, traceback.format_exc())
+ self.shared_variables_updater.Update(self.caller_id, self.cls)
+ continue
+
+ if self.arg_checker(self.cls, args):
+ num_tasks += 1
+ last_task = cur_task
+ cur_task = Task(self.func, args, self.caller_id,
+ self.exception_handler, self.should_return_results,
+ self.arg_checker, self.fail_on_error)
+ if last_task:
+ self.task_queue.put(last_task)
+ except Exception, e: # pylint: disable=broad-except
+ # This will also catch any exception raised due to an error in the
+ # iterator when fail_on_error is set, so check that we failed for some
+ # other reason before claiming that we had an unknown exception.
+ if not self.iterator_exception:
+ self.unknown_exception = e
+ finally:
+ # We need to make sure to update total_tasks[caller_id] before we enqueue
+ # the last task. Otherwise, a worker can retrieve the last task and
+ # complete it, then check total_tasks and determine that we're not done
+ # producing all before we update total_tasks. This approach forces workers
+ # to wait on the last task until after we've updated total_tasks.
+ total_tasks[self.caller_id] = num_tasks
+ if not cur_task:
+ # This happens if there were zero arguments to be put in the queue.
+ cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id,
+ None, None, None, None)
+ self.task_queue.put(cur_task)
+
+ # It's possible that the workers finished before we updated total_tasks,
+ # so we need to check here as well.
+ _NotifyIfDone(self.caller_id,
+ caller_id_finished_count.Get(self.caller_id))
+
+
+class WorkerPool(object):
+ """Pool of worker threads to which tasks can be added."""
+
+ def __init__(self, thread_count, logger, bucket_storage_uri_class=None,
+ gsutil_api_map=None, debug=0):
+ self.task_queue = _NewThreadsafeQueue()
+ self.threads = []
+ for _ in range(thread_count):
+ worker_thread = WorkerThread(
+ self.task_queue, logger,
+ bucket_storage_uri_class=bucket_storage_uri_class,
+ gsutil_api_map=gsutil_api_map, debug=debug)
+ self.threads.append(worker_thread)
+ worker_thread.start()
+
+ def AddTask(self, task):
+ self.task_queue.put(task)
+
+
+class WorkerThread(threading.Thread):
+ """Thread where all the work will be performed.
+
+ This makes the function calls for Apply and takes care of all error handling,
+ return value propagation, and shared_vars.
+
+ Note that this thread is NOT started upon instantiation because the function-
+ calling logic is also used in the single-threaded case.
+ """
+
+ def __init__(self, task_queue, logger, bucket_storage_uri_class=None,
+ gsutil_api_map=None, debug=0):
+ """Initializes the worker thread.
+
+ Args:
+ task_queue: The thread-safe queue from which this thread should obtain
+ its work.
+ logger: Logger to use for this thread.
+ bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
+ Settable for testing/mocking.
+ gsutil_api_map: Map of providers and API selector tuples to api classes
+ which can be used to communicate with those providers.
+ Used for the instantiating CloudApiDelegator class.
+ debug: debug level for the CloudApiDelegator class.
+ """
+ super(WorkerThread, self).__init__()
+ self.task_queue = task_queue
+ self.daemon = True
+ self.cached_classes = {}
+ self.shared_vars_updater = _SharedVariablesUpdater()
+
+ self.thread_gsutil_api = None
+ if bucket_storage_uri_class and gsutil_api_map:
+ self.thread_gsutil_api = CloudApiDelegator(
+ bucket_storage_uri_class, gsutil_api_map, logger, debug=debug)
+
+ def PerformTask(self, task, cls):
+ """Makes the function call for a task.
+
+ Args:
+ task: The Task to perform.
+ cls: The instance of a class which gives context to the functions called
+ by the Task's function. E.g., see SetAclFuncWrapper.
+ """
+ caller_id = task.caller_id
+ try:
+ results = task.func(cls, task.args, thread_state=self.thread_gsutil_api)
+ if task.should_return_results:
+ global_return_values_map.Update(caller_id, [results], default_value=[])
+ except Exception, e: # pylint: disable=broad-except
+ _IncrementFailureCount()
+ if task.fail_on_error:
+ raise # Only happens for single thread and process case.
+ else:
+ try:
+ task.exception_handler(cls, e)
+ except Exception, _: # pylint: disable=broad-except
+ # Don't allow callers to raise exceptions here and kill the worker
+ # threads.
+ cls.logger.debug(
+ 'Caught exception while handling exception for %s:\n%s',
+ task, traceback.format_exc())
+ finally:
+ self.shared_vars_updater.Update(caller_id, cls)
+
+ # Even if we encounter an exception, we still need to claim that that
+ # the function finished executing. Otherwise, we won't know when to
+ # stop waiting and return results.
+ num_done = caller_id_finished_count.Update(caller_id, 1)
+
+ if cls.multiprocessing_is_available:
+ _NotifyIfDone(caller_id, num_done)
+
+ def run(self):
+ while True:
+ task = self.task_queue.get()
+ caller_id = task.caller_id
+
+ # Get the instance of the command with the appropriate context.
+ cls = self.cached_classes.get(caller_id, None)
+ if not cls:
+ cls = copy.copy(class_map[caller_id])
+ cls.logger = CreateGsutilLogger(cls.command_name)
+ self.cached_classes[caller_id] = cls
+
+ self.PerformTask(task, cls)
+
+
+class _SharedVariablesUpdater(object):
+ """Used to update shared variable for a class in the global map.
+
+ Note that each thread will have its own instance of the calling class for
+ context, and it will also have its own instance of a
+ _SharedVariablesUpdater. This is used in the following way:
+
+ 1. Before any tasks are performed, each thread will get a copy of the
+ calling class, and the globally-consistent value of this shared variable
+ will be initialized to whatever it was before the call to Apply began.
+
+ 2. After each time a thread performs a task, it will look at the current
+ values of the shared variables in its instance of the calling class.
+
+ 2.A. For each such variable, it computes the delta of this variable
+ between the last known value for this class (which is stored in
+ a dict local to this class) and the current value of the variable
+ in the class.
+
+ 2.B. Using this delta, we update the last known value locally as well
+ as the globally-consistent value shared across all classes (the
+ globally consistent value is simply increased by the computed
+ delta).
+ """
+
+ def __init__(self):
+ self.last_shared_var_values = {}
+
+ def Update(self, caller_id, cls):
+ """Update any shared variables with their deltas."""
+ shared_vars = shared_vars_list_map.get(caller_id, None)
+ if shared_vars:
+ for name in shared_vars:
+ key = (caller_id, name)
+ last_value = self.last_shared_var_values.get(key, 0)
+ # Compute the change made since the last time we updated here. This is
+ # calculated by simply subtracting the last known value from the current
+ # value in the class instance.
+ delta = getattr(cls, name) - last_value
+ self.last_shared_var_values[key] = delta + last_value
+
+ # Update the globally-consistent value by simply increasing it by the
+ # computed delta.
+ shared_vars_map.Update(key, delta)
+
+
+def _NotifyIfDone(caller_id, num_done):
+ """Notify any threads waiting for results that something has finished.
+
+ Each waiting thread will then need to check the call_completed_map to see if
+ its work is done.
+
+ Note that num_done could be calculated here, but it is passed in as an
+ optimization so that we have one less call to a globally-locked data
+ structure.
+
+ Args:
+ caller_id: The caller_id of the function whose progress we're checking.
+ num_done: The number of tasks currently completed for that caller_id.
+ """
+ num_to_do = total_tasks[caller_id]
+ if num_to_do == num_done and num_to_do >= 0:
+ # Notify the Apply call that's sleeping that it's ready to return.
+ with need_pool_or_done_cond:
+ call_completed_map[caller_id] = True
+ need_pool_or_done_cond.notify_all()
+
+
+def ShutDownGsutil():
+ """Shut down all processes in consumer pools in preparation for exiting."""
+ for q in queues:
+ try:
+ q.cancel_join_thread()
+ except: # pylint: disable=bare-except
+ pass
+ for consumer_pool in consumer_pools:
+ consumer_pool.ShutDown()
+
+
+# pylint: disable=global-variable-undefined
+def _IncrementFailureCount():
+ global failure_count
+ if isinstance(failure_count, int):
+ failure_count += 1
+ else: # Otherwise it's a multiprocessing.Value() of type 'i'.
+ failure_count.value += 1
+
+
+# pylint: disable=global-variable-undefined
+def GetFailureCount():
+ """Returns the number of failures processed during calls to Apply()."""
+ try:
+ if isinstance(failure_count, int):
+ return failure_count
+ else: # It's a multiprocessing.Value() of type 'i'.
+ return failure_count.value
+ except NameError: # If it wasn't initialized, Apply() wasn't called.
+ return 0
+
+
+def ResetFailureCount():
+ """Resets the failure_count variable to 0 - useful if error is expected."""
+ try:
+ global failure_count
+ if isinstance(failure_count, int):
+ failure_count = 0
+ else: # It's a multiprocessing.Value() of type 'i'.
+ failure_count = multiprocessing.Value('i', 0)
+ except NameError: # If it wasn't initialized, Apply() wasn't called.
+ pass
« no previous file with comments | « third_party/gsutil/gslib/cloud_api_helper.py ('k') | third_party/gsutil/gslib/command_argument.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698