Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(505)

Unified Diff: tools/telemetry/third_party/gsutilz/gslib/command.py

Issue 1493973002: Remove telemetry/third_party/gsutilz (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@gsutil_changes
Patch Set: rebase Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/gsutilz/gslib/command.py
diff --git a/tools/telemetry/third_party/gsutilz/gslib/command.py b/tools/telemetry/third_party/gsutilz/gslib/command.py
deleted file mode 100644
index 3823b91505696bf08f8c3edd03d4af061a450575..0000000000000000000000000000000000000000
--- a/tools/telemetry/third_party/gsutilz/gslib/command.py
+++ /dev/null
@@ -1,1810 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2010 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Base class for gsutil commands.
-
-In addition to base class code, this file contains helpers that depend on base
-class state (such as GetAndPrintAcl) In general, functions that depend on
-class state and that are used by multiple commands belong in this file.
-Functions that don't depend on class state belong in util.py, and non-shared
-helpers belong in individual subclasses.
-"""
-
-from __future__ import absolute_import
-
-import codecs
-from collections import namedtuple
-import copy
-import getopt
-import logging
-import multiprocessing
-import os
-import Queue
-import signal
-import sys
-import textwrap
-import threading
-import traceback
-
-import boto
-from boto.storage_uri import StorageUri
-import gslib
-from gslib.cloud_api import AccessDeniedException
-from gslib.cloud_api import ArgumentException
-from gslib.cloud_api import ServiceException
-from gslib.cloud_api_delegator import CloudApiDelegator
-from gslib.cs_api_map import ApiSelector
-from gslib.cs_api_map import GsutilApiMapFactory
-from gslib.exception import CommandException
-from gslib.help_provider import HelpProvider
-from gslib.name_expansion import NameExpansionIterator
-from gslib.name_expansion import NameExpansionResult
-from gslib.parallelism_framework_util import AtomicIncrementDict
-from gslib.parallelism_framework_util import BasicIncrementDict
-from gslib.parallelism_framework_util import ThreadAndProcessSafeDict
-from gslib.plurality_checkable_iterator import PluralityCheckableIterator
-from gslib.sig_handling import RegisterSignalHandler
-from gslib.storage_url import StorageUrlFromString
-from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
-from gslib.translation_helper import AclTranslation
-from gslib.util import GetConfigFilePath
-from gslib.util import GsutilStreamHandler
-from gslib.util import HaveFileUrls
-from gslib.util import HaveProviderUrls
-from gslib.util import IS_WINDOWS
-from gslib.util import MultiprocessingIsAvailable
-from gslib.util import NO_MAX
-from gslib.util import UrlsAreForSingleProvider
-from gslib.util import UTF8
-from gslib.wildcard_iterator import CreateWildcardIterator
-
-OFFER_GSUTIL_M_SUGGESTION_THRESHOLD = 5
-
-if IS_WINDOWS:
- import ctypes # pylint: disable=g-import-not-at-top
-
-
-def _DefaultExceptionHandler(cls, e):
- cls.logger.exception(e)
-
-
-def CreateGsutilLogger(command_name):
- """Creates a logger that resembles 'print' output.
-
- This logger abides by gsutil -d/-D/-DD/-q options.
-
- By default (if none of the above options is specified) the logger will display
- all messages logged with level INFO or above. Log propagation is disabled.
-
- Args:
- command_name: Command name to create logger for.
-
- Returns:
- A logger object.
- """
- log = logging.getLogger(command_name)
- log.propagate = False
- log.setLevel(logging.root.level)
- log_handler = GsutilStreamHandler()
- log_handler.setFormatter(logging.Formatter('%(message)s'))
- # Commands that call other commands (like mv) would cause log handlers to be
- # added more than once, so avoid adding if one is already present.
- if not log.handlers:
- log.addHandler(log_handler)
- return log
-
-
-def _UrlArgChecker(command_instance, url):
- if not command_instance.exclude_symlinks:
- return True
- exp_src_url = url.expanded_storage_url
- if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name):
- command_instance.logger.info('Skipping symbolic link %s...', exp_src_url)
- return False
- return True
-
-
-def DummyArgChecker(*unused_args):
- return True
-
-
-def SetAclFuncWrapper(cls, name_expansion_result, thread_state=None):
- return cls.SetAclFunc(name_expansion_result, thread_state=thread_state)
-
-
-def SetAclExceptionHandler(cls, e):
- """Exception handler that maintains state about post-completion status."""
- cls.logger.error(str(e))
- cls.everything_set_okay = False
-
-# We will keep this list of all thread- or process-safe queues ever created by
-# the main thread so that we can forcefully kill them upon shutdown. Otherwise,
-# we encounter a Python bug in which empty queues block forever on join (which
-# is called as part of the Python exit function cleanup) under the impression
-# that they are non-empty.
-# However, this also lets us shut down somewhat more cleanly when interrupted.
-queues = []
-
-
-def _NewMultiprocessingQueue():
- queue = multiprocessing.Queue(MAX_QUEUE_SIZE)
- queues.append(queue)
- return queue
-
-
-def _NewThreadsafeQueue():
- queue = Queue.Queue(MAX_QUEUE_SIZE)
- queues.append(queue)
- return queue
-
-# The maximum size of a process- or thread-safe queue. Imposing this limit
-# prevents us from needing to hold an arbitrary amount of data in memory.
-# However, setting this number too high (e.g., >= 32768 on OS X) can cause
-# problems on some operating systems.
-MAX_QUEUE_SIZE = 32500
-
-# That maximum depth of the tree of recursive calls to command.Apply. This is
-# an arbitrary limit put in place to prevent developers from accidentally
-# causing problems with infinite recursion, and it can be increased if needed.
-MAX_RECURSIVE_DEPTH = 5
-
-ZERO_TASKS_TO_DO_ARGUMENT = ('There were no', 'tasks to do')
-
-# Map from deprecated aliases to the current command and subcommands that
-# provide the same behavior.
-# TODO: Remove this map and deprecate old commands on 9/9/14.
-OLD_ALIAS_MAP = {'chacl': ['acl', 'ch'],
- 'getacl': ['acl', 'get'],
- 'setacl': ['acl', 'set'],
- 'getcors': ['cors', 'get'],
- 'setcors': ['cors', 'set'],
- 'chdefacl': ['defacl', 'ch'],
- 'getdefacl': ['defacl', 'get'],
- 'setdefacl': ['defacl', 'set'],
- 'disablelogging': ['logging', 'set', 'off'],
- 'enablelogging': ['logging', 'set', 'on'],
- 'getlogging': ['logging', 'get'],
- 'getversioning': ['versioning', 'get'],
- 'setversioning': ['versioning', 'set'],
- 'getwebcfg': ['web', 'get'],
- 'setwebcfg': ['web', 'set']}
-
-
-# Declare all of the module level variables - see
-# InitializeMultiprocessingVariables for an explanation of why this is
-# necessary.
-# pylint: disable=global-at-module-level
-global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter
-global total_tasks, call_completed_map, global_return_values_map
-global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed
-global current_max_recursive_level, shared_vars_map, shared_vars_list_map
-global class_map, worker_checking_level_lock, failure_count
-
-
-def InitializeMultiprocessingVariables():
- """Initializes module-level variables that will be inherited by subprocesses.
-
- On Windows, a multiprocessing.Manager object should only
- be created within an "if __name__ == '__main__':" block. This function
- must be called, otherwise every command that calls Command.Apply will fail.
- """
- # This list of global variables must exactly match the above list of
- # declarations.
- # pylint: disable=global-variable-undefined
- global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter
- global total_tasks, call_completed_map, global_return_values_map
- global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed
- global current_max_recursive_level, shared_vars_map, shared_vars_list_map
- global class_map, worker_checking_level_lock, failure_count
-
- manager = multiprocessing.Manager()
-
- consumer_pools = []
-
- # List of all existing task queues - used by all pools to find the queue
- # that's appropriate for the given recursive_apply_level.
- task_queues = []
-
- # Used to assign a globally unique caller ID to each Apply call.
- caller_id_lock = manager.Lock()
- caller_id_counter = multiprocessing.Value('i', 0)
-
- # Map from caller_id to total number of tasks to be completed for that ID.
- total_tasks = ThreadAndProcessSafeDict(manager)
-
- # Map from caller_id to a boolean which is True iff all its tasks are
- # finished.
- call_completed_map = ThreadAndProcessSafeDict(manager)
-
- # Used to keep track of the set of return values for each caller ID.
- global_return_values_map = AtomicIncrementDict(manager)
-
- # Condition used to notify any waiting threads that a task has finished or
- # that a call to Apply needs a new set of consumer processes.
- need_pool_or_done_cond = manager.Condition()
-
- # Lock used to prevent multiple worker processes from asking the main thread
- # to create a new consumer pool for the same level.
- worker_checking_level_lock = manager.Lock()
-
- # Map from caller_id to the current number of completed tasks for that ID.
- caller_id_finished_count = AtomicIncrementDict(manager)
-
- # Used as a way for the main thread to distinguish between being woken up
- # by another call finishing and being woken up by a call that needs a new set
- # of consumer processes.
- new_pool_needed = multiprocessing.Value('i', 0)
-
- current_max_recursive_level = multiprocessing.Value('i', 0)
-
- # Map from (caller_id, name) to the value of that shared variable.
- shared_vars_map = AtomicIncrementDict(manager)
- shared_vars_list_map = ThreadAndProcessSafeDict(manager)
-
- # Map from caller_id to calling class.
- class_map = manager.dict()
-
- # Number of tasks that resulted in an exception in calls to Apply().
- failure_count = multiprocessing.Value('i', 0)
-
-
-# Each subclass of Command must define a property named 'command_spec' that is
-# an instance of the following class.
-CommandSpec = namedtuple('CommandSpec', [
- # Name of command.
- 'command_name',
- # Usage synopsis.
- 'usage_synopsis',
- # List of command name aliases.
- 'command_name_aliases',
- # Min number of args required by this command.
- 'min_args',
- # Max number of args required by this command, or NO_MAX.
- 'max_args',
- # Getopt-style string specifying acceptable sub args.
- 'supported_sub_args',
- # True if file URLs are acceptable for this command.
- 'file_url_ok',
- # True if provider-only URLs are acceptable for this command.
- 'provider_url_ok',
- # Index in args of first URL arg.
- 'urls_start_arg',
- # List of supported APIs
- 'gs_api_support',
- # Default API to use for this command
- 'gs_default_api',
- # Private arguments (for internal testing)
- 'supported_private_args',
- 'argparse_arguments',
-])
-
-
-class Command(HelpProvider):
- """Base class for all gsutil commands."""
-
- # Each subclass must override this with an instance of CommandSpec.
- command_spec = None
-
- _commands_with_subcommands_and_subopts = ['acl', 'defacl', 'logging', 'web',
- 'notification']
-
- # This keeps track of the recursive depth of the current call to Apply.
- recursive_apply_level = 0
-
- # If the multiprocessing module isn't available, we'll use this to keep track
- # of the caller_id.
- sequential_caller_id = -1
-
- @staticmethod
- def CreateCommandSpec(command_name, usage_synopsis=None,
- command_name_aliases=None, min_args=0,
- max_args=NO_MAX, supported_sub_args='',
- file_url_ok=False, provider_url_ok=False,
- urls_start_arg=0, gs_api_support=None,
- gs_default_api=None, supported_private_args=None,
- argparse_arguments=None):
- """Creates an instance of CommandSpec, with defaults."""
- return CommandSpec(
- command_name=command_name,
- usage_synopsis=usage_synopsis,
- command_name_aliases=command_name_aliases or [],
- min_args=min_args,
- max_args=max_args,
- supported_sub_args=supported_sub_args,
- file_url_ok=file_url_ok,
- provider_url_ok=provider_url_ok,
- urls_start_arg=urls_start_arg,
- gs_api_support=gs_api_support or [ApiSelector.XML],
- gs_default_api=gs_default_api or ApiSelector.XML,
- supported_private_args=supported_private_args,
- argparse_arguments=argparse_arguments or [])
-
- # Define a convenience property for command name, since it's used many places.
- def _GetDefaultCommandName(self):
- return self.command_spec.command_name
- command_name = property(_GetDefaultCommandName)
-
- def _CalculateUrlsStartArg(self):
- """Calculate the index in args of the first URL arg.
-
- Returns:
- Index of the first URL arg (according to the command spec).
- """
- return self.command_spec.urls_start_arg
-
- def _TranslateDeprecatedAliases(self, args):
- """Map deprecated aliases to the corresponding new command, and warn."""
- new_command_args = OLD_ALIAS_MAP.get(self.command_alias_used, None)
- if new_command_args:
- # Prepend any subcommands for the new command. The command name itself
- # is not part of the args, so leave it out.
- args = new_command_args[1:] + args
- self.logger.warn('\n'.join(textwrap.wrap(
- ('You are using a deprecated alias, "%(used_alias)s", for the '
- '"%(command_name)s" command. This will stop working on 9/9/2014. '
- 'Please use "%(command_name)s" with the appropriate sub-command in '
- 'the future. See "gsutil help %(command_name)s" for details.') %
- {'used_alias': self.command_alias_used,
- 'command_name': self.command_name})))
- return args
-
- def __init__(self, command_runner, args, headers, debug, parallel_operations,
- bucket_storage_uri_class, gsutil_api_class_map_factory,
- test_method=None, logging_filters=None,
- command_alias_used=None):
- """Instantiates a Command.
-
- Args:
- command_runner: CommandRunner (for commands built atop other commands).
- args: Command-line args (arg0 = actual arg, not command name ala bash).
- headers: Dictionary containing optional HTTP headers to pass to boto.
- debug: Debug level to pass in to boto connection (range 0..3).
- parallel_operations: Should command operations be executed in parallel?
- bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
- Settable for testing/mocking.
- gsutil_api_class_map_factory: Creates map of cloud storage interfaces.
- Settable for testing/mocking.
- test_method: Optional general purpose method for testing purposes.
- Application and semantics of this method will vary by
- command and test type.
- logging_filters: Optional list of logging.Filters to apply to this
- command's logger.
- command_alias_used: The alias that was actually used when running this
- command (as opposed to the "official" command name,
- which will always correspond to the file name).
-
- Implementation note: subclasses shouldn't need to define an __init__
- method, and instead depend on the shared initialization that happens
- here. If you do define an __init__ method in a subclass you'll need to
- explicitly call super().__init__(). But you're encouraged not to do this,
- because it will make changing the __init__ interface more painful.
- """
- # Save class values from constructor params.
- self.command_runner = command_runner
- self.unparsed_args = args
- self.headers = headers
- self.debug = debug
- self.parallel_operations = parallel_operations
- self.bucket_storage_uri_class = bucket_storage_uri_class
- self.gsutil_api_class_map_factory = gsutil_api_class_map_factory
- self.test_method = test_method
- self.exclude_symlinks = False
- self.recursion_requested = False
- self.all_versions = False
- self.command_alias_used = command_alias_used
-
- # Global instance of a threaded logger object.
- self.logger = CreateGsutilLogger(self.command_name)
- if logging_filters:
- for log_filter in logging_filters:
- self.logger.addFilter(log_filter)
-
- if self.command_spec is None:
- raise CommandException('"%s" command implementation is missing a '
- 'command_spec definition.' % self.command_name)
-
- # Parse and validate args.
- self.args = self._TranslateDeprecatedAliases(args)
- self.ParseSubOpts()
-
- # Named tuple public functions start with _
- # pylint: disable=protected-access
- self.command_spec = self.command_spec._replace(
- urls_start_arg=self._CalculateUrlsStartArg())
-
- if (len(self.args) < self.command_spec.min_args
- or len(self.args) > self.command_spec.max_args):
- self.RaiseWrongNumberOfArgumentsException()
-
- if self.command_name not in self._commands_with_subcommands_and_subopts:
- self.CheckArguments()
-
- # Build the support and default maps from the command spec.
- support_map = {
- 'gs': self.command_spec.gs_api_support,
- 's3': [ApiSelector.XML]
- }
- default_map = {
- 'gs': self.command_spec.gs_default_api,
- 's3': ApiSelector.XML
- }
- self.gsutil_api_map = GsutilApiMapFactory.GetApiMap(
- self.gsutil_api_class_map_factory, support_map, default_map)
-
- self.project_id = None
- self.gsutil_api = CloudApiDelegator(
- bucket_storage_uri_class, self.gsutil_api_map,
- self.logger, debug=self.debug)
-
- # Cross-platform path to run gsutil binary.
- self.gsutil_cmd = ''
- # If running on Windows, invoke python interpreter explicitly.
- if gslib.util.IS_WINDOWS:
- self.gsutil_cmd += 'python '
- # Add full path to gsutil to make sure we test the correct version.
- self.gsutil_path = gslib.GSUTIL_PATH
- self.gsutil_cmd += self.gsutil_path
-
- # We're treating recursion_requested like it's used by all commands, but
- # only some of the commands accept the -R option.
- if self.sub_opts:
- for o, unused_a in self.sub_opts:
- if o == '-r' or o == '-R':
- self.recursion_requested = True
- break
-
- self.multiprocessing_is_available = MultiprocessingIsAvailable()[0]
-
- def RaiseWrongNumberOfArgumentsException(self):
- """Raises exception for wrong number of arguments supplied to command."""
- if len(self.args) < self.command_spec.min_args:
- tail_str = 's' if self.command_spec.min_args > 1 else ''
- message = ('The %s command requires at least %d argument%s.' %
- (self.command_name, self.command_spec.min_args, tail_str))
- else:
- message = ('The %s command accepts at most %d arguments.' %
- (self.command_name, self.command_spec.max_args))
- message += ' Usage:\n%s\nFor additional help run:\n gsutil help %s' % (
- self.command_spec.usage_synopsis, self.command_name)
- raise CommandException(message)
-
- def RaiseInvalidArgumentException(self):
- """Raises exception for specifying an invalid argument to command."""
- message = ('Incorrect option(s) specified. Usage:\n%s\n'
- 'For additional help run:\n gsutil help %s' % (
- self.command_spec.usage_synopsis, self.command_name))
- raise CommandException(message)
-
- def ParseSubOpts(self, check_args=False):
- """Parses sub-opt args.
-
- Args:
- check_args: True to have CheckArguments() called after parsing.
-
- Populates:
- (self.sub_opts, self.args) from parsing.
-
- Raises: RaiseInvalidArgumentException if invalid args specified.
- """
- try:
- self.sub_opts, self.args = getopt.getopt(
- self.args, self.command_spec.supported_sub_args,
- self.command_spec.supported_private_args or [])
- except getopt.GetoptError:
- self.RaiseInvalidArgumentException()
- if check_args:
- self.CheckArguments()
-
- def CheckArguments(self):
- """Checks that command line arguments match the command_spec.
-
- Any commands in self._commands_with_subcommands_and_subopts are responsible
- for calling this method after handling initial parsing of their arguments.
- This prevents commands with sub-commands as well as options from breaking
- the parsing of getopt.
-
- TODO: Provide a function to parse commands and sub-commands more
- intelligently once we stop allowing the deprecated command versions.
-
- Raises:
- CommandException if the arguments don't match.
- """
-
- if (not self.command_spec.file_url_ok
- and HaveFileUrls(self.args[self.command_spec.urls_start_arg:])):
- raise CommandException('"%s" command does not support "file://" URLs. '
- 'Did you mean to use a gs:// URL?' %
- self.command_name)
- if (not self.command_spec.provider_url_ok
- and HaveProviderUrls(self.args[self.command_spec.urls_start_arg:])):
- raise CommandException('"%s" command does not support provider-only '
- 'URLs.' % self.command_name)
-
- def WildcardIterator(self, url_string, all_versions=False):
- """Helper to instantiate gslib.WildcardIterator.
-
- Args are same as gslib.WildcardIterator interface, but this method fills in
- most of the values from instance state.
-
- Args:
- url_string: URL string naming wildcard objects to iterate.
- all_versions: If true, the iterator yields all versions of objects
- matching the wildcard. If false, yields just the live
- object version.
-
- Returns:
- WildcardIterator for use by caller.
- """
- return CreateWildcardIterator(
- url_string, self.gsutil_api, all_versions=all_versions,
- debug=self.debug, project_id=self.project_id)
-
- def RunCommand(self):
- """Abstract function in base class. Subclasses must implement this.
-
- The return value of this function will be used as the exit status of the
- process, so subclass commands should return an integer exit code (0 for
- success, a value in [1,255] for failure).
- """
- raise CommandException('Command %s is missing its RunCommand() '
- 'implementation' % self.command_name)
-
- ############################################################
- # Shared helper functions that depend on base class state. #
- ############################################################
-
- def ApplyAclFunc(self, acl_func, acl_excep_handler, url_strs):
- """Sets the standard or default object ACL depending on self.command_name.
-
- Args:
- acl_func: ACL function to be passed to Apply.
- acl_excep_handler: ACL exception handler to be passed to Apply.
- url_strs: URL strings on which to set ACL.
-
- Raises:
- CommandException if an ACL could not be set.
- """
- multi_threaded_url_args = []
- # Handle bucket ACL setting operations single-threaded, because
- # our threading machinery currently assumes it's working with objects
- # (name_expansion_iterator), and normally we wouldn't expect users to need
- # to set ACLs on huge numbers of buckets at once anyway.
- for url_str in url_strs:
- url = StorageUrlFromString(url_str)
- if url.IsCloudUrl() and url.IsBucket():
- if self.recursion_requested:
- # If user specified -R option, convert any bucket args to bucket
- # wildcards (e.g., gs://bucket/*), to prevent the operation from
- # being applied to the buckets themselves.
- url.object_name = '*'
- multi_threaded_url_args.append(url.url_string)
- else:
- # Convert to a NameExpansionResult so we can re-use the threaded
- # function for the single-threaded implementation. RefType is unused.
- for blr in self.WildcardIterator(url.url_string).IterBuckets(
- bucket_fields=['id']):
- name_expansion_for_url = NameExpansionResult(
- url, False, False, blr.storage_url)
- acl_func(self, name_expansion_for_url)
- else:
- multi_threaded_url_args.append(url_str)
-
- if len(multi_threaded_url_args) >= 1:
- name_expansion_iterator = NameExpansionIterator(
- self.command_name, self.debug,
- self.logger, self.gsutil_api,
- multi_threaded_url_args, self.recursion_requested,
- all_versions=self.all_versions,
- continue_on_error=self.continue_on_error or self.parallel_operations)
-
- # Perform requests in parallel (-m) mode, if requested, using
- # configured number of parallel processes and threads. Otherwise,
- # perform requests with sequential function calls in current process.
- self.Apply(acl_func, name_expansion_iterator, acl_excep_handler,
- fail_on_error=not self.continue_on_error)
-
- if not self.everything_set_okay and not self.continue_on_error:
- raise CommandException('ACLs for some objects could not be set.')
-
- def SetAclFunc(self, name_expansion_result, thread_state=None):
- """Sets the object ACL for the name_expansion_result provided.
-
- Args:
- name_expansion_result: NameExpansionResult describing the target object.
- thread_state: If present, use this gsutil Cloud API instance for the set.
- """
- if thread_state:
- assert not self.def_acl
- gsutil_api = thread_state
- else:
- gsutil_api = self.gsutil_api
- op_string = 'default object ACL' if self.def_acl else 'ACL'
- url = name_expansion_result.expanded_storage_url
- self.logger.info('Setting %s on %s...', op_string, url)
- if (gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML
- and url.scheme != 'gs'):
- # If we are called with a non-google ACL model, we need to use the XML
- # passthrough. acl_arg should either be a canned ACL or an XML ACL.
- self._SetAclXmlPassthrough(url, gsutil_api)
- else:
- # Normal Cloud API path. acl_arg is a JSON ACL or a canned ACL.
- self._SetAclGsutilApi(url, gsutil_api)
-
- def _SetAclXmlPassthrough(self, url, gsutil_api):
- """Sets the ACL for the URL provided using the XML passthrough functions.
-
- This function assumes that self.def_acl, self.canned,
- and self.continue_on_error are initialized, and that self.acl_arg is
- either an XML string or a canned ACL string.
-
- Args:
- url: CloudURL to set the ACL on.
- gsutil_api: gsutil Cloud API to use for the ACL set. Must support XML
- passthrough functions.
- """
- try:
- orig_prefer_api = gsutil_api.prefer_api
- gsutil_api.prefer_api = ApiSelector.XML
- gsutil_api.XmlPassThroughSetAcl(
- self.acl_arg, url, canned=self.canned,
- def_obj_acl=self.def_acl, provider=url.scheme)
- except ServiceException as e:
- if self.continue_on_error:
- self.everything_set_okay = False
- self.logger.error(e)
- else:
- raise
- finally:
- gsutil_api.prefer_api = orig_prefer_api
-
- def _SetAclGsutilApi(self, url, gsutil_api):
- """Sets the ACL for the URL provided using the gsutil Cloud API.
-
- This function assumes that self.def_acl, self.canned,
- and self.continue_on_error are initialized, and that self.acl_arg is
- either a JSON string or a canned ACL string.
-
- Args:
- url: CloudURL to set the ACL on.
- gsutil_api: gsutil Cloud API to use for the ACL set.
- """
- try:
- if url.IsBucket():
- if self.def_acl:
- if self.canned:
- gsutil_api.PatchBucket(
- url.bucket_name, apitools_messages.Bucket(),
- canned_def_acl=self.acl_arg, provider=url.scheme, fields=['id'])
- else:
- def_obj_acl = AclTranslation.JsonToMessage(
- self.acl_arg, apitools_messages.ObjectAccessControl)
- bucket_metadata = apitools_messages.Bucket(
- defaultObjectAcl=def_obj_acl)
- gsutil_api.PatchBucket(url.bucket_name, bucket_metadata,
- provider=url.scheme, fields=['id'])
- else:
- if self.canned:
- gsutil_api.PatchBucket(
- url.bucket_name, apitools_messages.Bucket(),
- canned_acl=self.acl_arg, provider=url.scheme, fields=['id'])
- else:
- bucket_acl = AclTranslation.JsonToMessage(
- self.acl_arg, apitools_messages.BucketAccessControl)
- bucket_metadata = apitools_messages.Bucket(acl=bucket_acl)
- gsutil_api.PatchBucket(url.bucket_name, bucket_metadata,
- provider=url.scheme, fields=['id'])
- else: # url.IsObject()
- if self.canned:
- gsutil_api.PatchObjectMetadata(
- url.bucket_name, url.object_name, apitools_messages.Object(),
- provider=url.scheme, generation=url.generation,
- canned_acl=self.acl_arg)
- else:
- object_acl = AclTranslation.JsonToMessage(
- self.acl_arg, apitools_messages.ObjectAccessControl)
- object_metadata = apitools_messages.Object(acl=object_acl)
- gsutil_api.PatchObjectMetadata(url.bucket_name, url.object_name,
- object_metadata, provider=url.scheme,
- generation=url.generation)
- except ArgumentException, e:
- raise
- except ServiceException, e:
- if self.continue_on_error:
- self.everything_set_okay = False
- self.logger.error(e)
- else:
- raise
-
- def SetAclCommandHelper(self, acl_func, acl_excep_handler):
- """Sets ACLs on the self.args using the passed-in acl function.
-
- Args:
- acl_func: ACL function to be passed to Apply.
- acl_excep_handler: ACL exception handler to be passed to Apply.
- """
- acl_arg = self.args[0]
- url_args = self.args[1:]
- # Disallow multi-provider setacl requests, because there are differences in
- # the ACL models.
- if not UrlsAreForSingleProvider(url_args):
- raise CommandException('"%s" command spanning providers not allowed.' %
- self.command_name)
-
- # Determine whether acl_arg names a file containing XML ACL text vs. the
- # string name of a canned ACL.
- if os.path.isfile(acl_arg):
- with codecs.open(acl_arg, 'r', UTF8) as f:
- acl_arg = f.read()
- self.canned = False
- else:
- # No file exists, so expect a canned ACL string.
- # Canned ACLs are not supported in JSON and we need to use the XML API
- # to set them.
- # validate=False because we allow wildcard urls.
- storage_uri = boto.storage_uri(
- url_args[0], debug=self.debug, validate=False,
- bucket_storage_uri_class=self.bucket_storage_uri_class)
-
- canned_acls = storage_uri.canned_acls()
- if acl_arg not in canned_acls:
- raise CommandException('Invalid canned ACL "%s".' % acl_arg)
- self.canned = True
-
- # Used to track if any ACLs failed to be set.
- self.everything_set_okay = True
- self.acl_arg = acl_arg
-
- self.ApplyAclFunc(acl_func, acl_excep_handler, url_args)
- if not self.everything_set_okay and not self.continue_on_error:
- raise CommandException('ACLs for some objects could not be set.')
-
- def _WarnServiceAccounts(self):
- """Warns service account users who have received an AccessDenied error.
-
- When one of the metadata-related commands fails due to AccessDenied, user
- must ensure that they are listed as an Owner in the API console.
- """
- # Import this here so that the value will be set first in
- # gcs_oauth2_boto_plugin.
- # pylint: disable=g-import-not-at-top
- from gcs_oauth2_boto_plugin.oauth2_plugin import IS_SERVICE_ACCOUNT
-
- if IS_SERVICE_ACCOUNT:
- # This method is only called when canned ACLs are used, so the warning
- # definitely applies.
- self.logger.warning('\n'.join(textwrap.wrap(
- 'It appears that your service account has been denied access while '
- 'attempting to perform a metadata operation. If you believe that you '
- 'should have access to this metadata (i.e., if it is associated with '
- 'your account), please make sure that your service account''s email '
- 'address is listed as an Owner in the Team tab of the API console. '
- 'See "gsutil help creds" for further information.\n')))
-
- def GetAndPrintAcl(self, url_str):
- """Prints the standard or default object ACL depending on self.command_name.
-
- Args:
- url_str: URL string to get ACL for.
- """
- blr = self.GetAclCommandBucketListingReference(url_str)
- url = StorageUrlFromString(url_str)
- if (self.gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML
- and url.scheme != 'gs'):
- # Need to use XML passthrough.
- try:
- acl = self.gsutil_api.XmlPassThroughGetAcl(
- url, def_obj_acl=self.def_acl, provider=url.scheme)
- print acl.to_xml()
- except AccessDeniedException, _:
- self._WarnServiceAccounts()
- raise
- else:
- if self.command_name == 'defacl':
- acl = blr.root_object.defaultObjectAcl
- if not acl:
- self.logger.warn(
- 'No default object ACL present for %s. This could occur if '
- 'the default object ACL is private, in which case objects '
- 'created in this bucket will be readable only by their '
- 'creators. It could also mean you do not have OWNER permission '
- 'on %s and therefore do not have permission to read the '
- 'default object ACL.', url_str, url_str)
- else:
- acl = blr.root_object.acl
- if not acl:
- self._WarnServiceAccounts()
- raise AccessDeniedException('Access denied. Please ensure you have '
- 'OWNER permission on %s.' % url_str)
- print AclTranslation.JsonFromMessage(acl)
-
- def GetAclCommandBucketListingReference(self, url_str):
- """Gets a single bucket listing reference for an acl get command.
-
- Args:
- url_str: URL string to get the bucket listing reference for.
-
- Returns:
- BucketListingReference for the URL string.
-
- Raises:
- CommandException if string did not result in exactly one reference.
- """
- # We're guaranteed by caller that we have the appropriate type of url
- # string for the call (ex. we will never be called with an object string
- # by getdefacl)
- wildcard_url = StorageUrlFromString(url_str)
- if wildcard_url.IsObject():
- plurality_iter = PluralityCheckableIterator(
- self.WildcardIterator(url_str).IterObjects(
- bucket_listing_fields=['acl']))
- else:
- # Bucket or provider. We call IterBuckets explicitly here to ensure that
- # the root object is populated with the acl.
- if self.command_name == 'defacl':
- bucket_fields = ['defaultObjectAcl']
- else:
- bucket_fields = ['acl']
- plurality_iter = PluralityCheckableIterator(
- self.WildcardIterator(url_str).IterBuckets(
- bucket_fields=bucket_fields))
- if plurality_iter.IsEmpty():
- raise CommandException('No URLs matched')
- if plurality_iter.HasPlurality():
- raise CommandException(
- '%s matched more than one URL, which is not allowed by the %s '
- 'command' % (url_str, self.command_name))
- return list(plurality_iter)[0]
-
- def _HandleMultiProcessingSigs(self, unused_signal_num,
- unused_cur_stack_frame):
- """Handles signals INT AND TERM during a multi-process/multi-thread request.
-
- Kills subprocesses.
-
- Args:
- unused_signal_num: signal generated by ^C.
- unused_cur_stack_frame: Current stack frame.
- """
- # Note: This only works under Linux/MacOS. See
- # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details
- # about why making it work correctly across OS's is harder and still open.
- ShutDownGsutil()
- sys.stderr.write('Caught ^C - exiting\n')
- # Simply calling sys.exit(1) doesn't work - see above bug for details.
- KillProcess(os.getpid())
-
- def GetSingleBucketUrlFromArg(self, arg, bucket_fields=None):
- """Gets a single bucket URL based on the command arguments.
-
- Args:
- arg: String argument to get bucket URL for.
- bucket_fields: Fields to populate for the bucket.
-
- Returns:
- (StorageUrl referring to a single bucket, Bucket metadata).
-
- Raises:
- CommandException if args did not match exactly one bucket.
- """
- plurality_checkable_iterator = self.GetBucketUrlIterFromArg(
- arg, bucket_fields=bucket_fields)
- if plurality_checkable_iterator.HasPlurality():
- raise CommandException(
- '%s matched more than one URL, which is not\n'
- 'allowed by the %s command' % (arg, self.command_name))
- blr = list(plurality_checkable_iterator)[0]
- return StorageUrlFromString(blr.url_string), blr.root_object
-
- def GetBucketUrlIterFromArg(self, arg, bucket_fields=None):
- """Gets a single bucket URL based on the command arguments.
-
- Args:
- arg: String argument to iterate over.
- bucket_fields: Fields to populate for the bucket.
-
- Returns:
- PluralityCheckableIterator over buckets.
-
- Raises:
- CommandException if iterator matched no buckets.
- """
- arg_url = StorageUrlFromString(arg)
- if not arg_url.IsCloudUrl() or arg_url.IsObject():
- raise CommandException('"%s" command must specify a bucket' %
- self.command_name)
-
- plurality_checkable_iterator = PluralityCheckableIterator(
- self.WildcardIterator(arg).IterBuckets(
- bucket_fields=bucket_fields))
- if plurality_checkable_iterator.IsEmpty():
- raise CommandException('No URLs matched')
- return plurality_checkable_iterator
-
- ######################
- # Private functions. #
- ######################
-
- def _ResetConnectionPool(self):
- # Each OS process needs to establish its own set of connections to
- # the server to avoid writes from different OS processes interleaving
- # onto the same socket (and garbling the underlying SSL session).
- # We ensure each process gets its own set of connections here by
- # closing all connections in the storage provider connection pool.
- connection_pool = StorageUri.provider_pool
- if connection_pool:
- for i in connection_pool:
- connection_pool[i].connection.close()
-
- def _GetProcessAndThreadCount(self, process_count, thread_count,
- parallel_operations_override):
- """Determines the values of process_count and thread_count.
-
- These values are used for parallel operations.
- If we're not performing operations in parallel, then ignore
- existing values and use process_count = thread_count = 1.
-
- Args:
- process_count: A positive integer or None. In the latter case, we read
- the value from the .boto config file.
- thread_count: A positive integer or None. In the latter case, we read
- the value from the .boto config file.
- parallel_operations_override: Used to override self.parallel_operations.
- This allows the caller to safely override
- the top-level flag for a single call.
-
- Returns:
- (process_count, thread_count): The number of processes and threads to use,
- respectively.
- """
- # Set OS process and python thread count as a function of options
- # and config.
- if self.parallel_operations or parallel_operations_override:
- if not process_count:
- process_count = boto.config.getint(
- 'GSUtil', 'parallel_process_count',
- gslib.commands.config.DEFAULT_PARALLEL_PROCESS_COUNT)
- if process_count < 1:
- raise CommandException('Invalid parallel_process_count "%d".' %
- process_count)
- if not thread_count:
- thread_count = boto.config.getint(
- 'GSUtil', 'parallel_thread_count',
- gslib.commands.config.DEFAULT_PARALLEL_THREAD_COUNT)
- if thread_count < 1:
- raise CommandException('Invalid parallel_thread_count "%d".' %
- thread_count)
- else:
- # If -m not specified, then assume 1 OS process and 1 Python thread.
- process_count = 1
- thread_count = 1
-
- if IS_WINDOWS and process_count > 1:
- raise CommandException('\n'.join(textwrap.wrap(
- ('It is not possible to set process_count > 1 on Windows. Please '
- 'update your config file (located at %s) and set '
- '"parallel_process_count = 1".') %
- GetConfigFilePath())))
- self.logger.debug('process count: %d', process_count)
- self.logger.debug('thread count: %d', thread_count)
-
- return (process_count, thread_count)
-
- def _SetUpPerCallerState(self):
- """Set up the state for a caller id, corresponding to one Apply call."""
- # Get a new caller ID.
- with caller_id_lock:
- caller_id_counter.value += 1
- caller_id = caller_id_counter.value
-
- # Create a copy of self with an incremented recursive level. This allows
- # the class to report its level correctly if the function called from it
- # also needs to call Apply.
- cls = copy.copy(self)
- cls.recursive_apply_level += 1
-
- # Thread-safe loggers can't be pickled, so we will remove it here and
- # recreate it later in the WorkerThread. This is not a problem since any
- # logger with the same name will be treated as a singleton.
- cls.logger = None
-
- # Likewise, the default API connection can't be pickled, but it is unused
- # anyway as each thread gets its own API delegator.
- cls.gsutil_api = None
-
- class_map[caller_id] = cls
- total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet.
- call_completed_map[caller_id] = False
- caller_id_finished_count.Put(caller_id, 0)
- global_return_values_map.Put(caller_id, [])
- return caller_id
-
- def _CreateNewConsumerPool(self, num_processes, num_threads):
- """Create a new pool of processes that call _ApplyThreads."""
- processes = []
- task_queue = _NewMultiprocessingQueue()
- task_queues.append(task_queue)
-
- current_max_recursive_level.value += 1
- if current_max_recursive_level.value > MAX_RECURSIVE_DEPTH:
- raise CommandException('Recursion depth of Apply calls is too great.')
- for _ in range(num_processes):
- recursive_apply_level = len(consumer_pools)
- p = multiprocessing.Process(
- target=self._ApplyThreads,
- args=(num_threads, num_processes, recursive_apply_level))
- p.daemon = True
- processes.append(p)
- p.start()
- consumer_pool = _ConsumerPool(processes, task_queue)
- consumer_pools.append(consumer_pool)
-
- def Apply(self, func, args_iterator, exception_handler,
- shared_attrs=None, arg_checker=_UrlArgChecker,
- parallel_operations_override=False, process_count=None,
- thread_count=None, should_return_results=False,
- fail_on_error=False):
- """Calls _Parallel/SequentialApply based on multiprocessing availability.
-
- Args:
- func: Function to call to process each argument.
- args_iterator: Iterable collection of arguments to be put into the
- work queue.
- exception_handler: Exception handler for WorkerThread class.
- shared_attrs: List of attributes to manage across sub-processes.
- arg_checker: Used to determine whether we should process the current
- argument or simply skip it. Also handles any logging that
- is specific to a particular type of argument.
- parallel_operations_override: Used to override self.parallel_operations.
- This allows the caller to safely override
- the top-level flag for a single call.
- process_count: The number of processes to use. If not specified, then
- the configured default will be used.
- thread_count: The number of threads per process. If not speficied, then
- the configured default will be used..
- should_return_results: If true, then return the results of all successful
- calls to func in a list.
- fail_on_error: If true, then raise any exceptions encountered when
- executing func. This is only applicable in the case of
- process_count == thread_count == 1.
-
- Returns:
- Results from spawned threads.
- """
- if shared_attrs:
- original_shared_vars_values = {} # We'll add these back in at the end.
- for name in shared_attrs:
- original_shared_vars_values[name] = getattr(self, name)
- # By setting this to 0, we simplify the logic for computing deltas.
- # We'll add it back after all of the tasks have been performed.
- setattr(self, name, 0)
-
- (process_count, thread_count) = self._GetProcessAndThreadCount(
- process_count, thread_count, parallel_operations_override)
-
- is_main_thread = (self.recursive_apply_level == 0
- and self.sequential_caller_id == -1)
-
- # We don't honor the fail_on_error flag in the case of multiple threads
- # or processes.
- fail_on_error = fail_on_error and (process_count * thread_count == 1)
-
- # Only check this from the first call in the main thread. Apart from the
- # fact that it's wasteful to try this multiple times in general, it also
- # will never work when called from a subprocess since we use daemon
- # processes, and daemons can't create other processes.
- if is_main_thread:
- if ((not self.multiprocessing_is_available)
- and thread_count * process_count > 1):
- # Run the check again and log the appropriate warnings. This was run
- # before, when the Command object was created, in order to calculate
- # self.multiprocessing_is_available, but we don't want to print the
- # warning until we're sure the user actually tried to use multiple
- # threads or processes.
- MultiprocessingIsAvailable(logger=self.logger)
-
- if self.multiprocessing_is_available:
- caller_id = self._SetUpPerCallerState()
- else:
- self.sequential_caller_id += 1
- caller_id = self.sequential_caller_id
-
- if is_main_thread:
- # pylint: disable=global-variable-undefined
- global global_return_values_map, shared_vars_map, failure_count
- global caller_id_finished_count, shared_vars_list_map
- global_return_values_map = BasicIncrementDict()
- global_return_values_map.Put(caller_id, [])
- shared_vars_map = BasicIncrementDict()
- caller_id_finished_count = BasicIncrementDict()
- shared_vars_list_map = {}
- failure_count = 0
-
- # If any shared attributes passed by caller, create a dictionary of
- # shared memory variables for every element in the list of shared
- # attributes.
- if shared_attrs:
- shared_vars_list_map[caller_id] = shared_attrs
- for name in shared_attrs:
- shared_vars_map.Put((caller_id, name), 0)
-
- # Make all of the requested function calls.
- if self.multiprocessing_is_available and thread_count * process_count > 1:
- self._ParallelApply(func, args_iterator, exception_handler, caller_id,
- arg_checker, process_count, thread_count,
- should_return_results, fail_on_error)
- else:
- self._SequentialApply(func, args_iterator, exception_handler, caller_id,
- arg_checker, should_return_results, fail_on_error)
-
- if shared_attrs:
- for name in shared_attrs:
- # This allows us to retain the original value of the shared variable,
- # and simply apply the delta after what was done during the call to
- # apply.
- final_value = (original_shared_vars_values[name] +
- shared_vars_map.Get((caller_id, name)))
- setattr(self, name, final_value)
-
- if should_return_results:
- return global_return_values_map.Get(caller_id)
-
- def _MaybeSuggestGsutilDashM(self):
- """Outputs a sugestion to the user to use gsutil -m."""
- if not (boto.config.getint('GSUtil', 'parallel_process_count', 0) == 1 and
- boto.config.getint('GSUtil', 'parallel_thread_count', 0) == 1):
- self.logger.info('\n' + textwrap.fill(
- '==> NOTE: You are performing a sequence of gsutil operations that '
- 'may run significantly faster if you instead use gsutil -m %s ...\n'
- 'Please see the -m section under "gsutil help options" for further '
- 'information about when gsutil -m can be advantageous.'
- % sys.argv[1]) + '\n')
-
- # pylint: disable=g-doc-args
- def _SequentialApply(self, func, args_iterator, exception_handler, caller_id,
- arg_checker, should_return_results, fail_on_error):
- """Performs all function calls sequentially in the current thread.
-
- No other threads or processes will be spawned. This degraded functionality
- is used when the multiprocessing module is not available or the user
- requests only one thread and one process.
- """
- # Create a WorkerThread to handle all of the logic needed to actually call
- # the function. Note that this thread will never be started, and all work
- # is done in the current thread.
- worker_thread = WorkerThread(None, False)
- args_iterator = iter(args_iterator)
- # Count of sequential calls that have been made. Used for producing
- # suggestion to use gsutil -m.
- sequential_call_count = 0
- while True:
-
- # Try to get the next argument, handling any exceptions that arise.
- try:
- args = args_iterator.next()
- except StopIteration, e:
- break
- except Exception, e: # pylint: disable=broad-except
- _IncrementFailureCount()
- if fail_on_error:
- raise
- else:
- try:
- exception_handler(self, e)
- except Exception, _: # pylint: disable=broad-except
- self.logger.debug(
- 'Caught exception while handling exception for %s:\n%s',
- func, traceback.format_exc())
- continue
-
- sequential_call_count += 1
- if sequential_call_count == OFFER_GSUTIL_M_SUGGESTION_THRESHOLD:
- # Output suggestion near beginning of run, so user sees it early and can
- # ^C and try gsutil -m.
- self._MaybeSuggestGsutilDashM()
- if arg_checker(self, args):
- # Now that we actually have the next argument, perform the task.
- task = Task(func, args, caller_id, exception_handler,
- should_return_results, arg_checker, fail_on_error)
- worker_thread.PerformTask(task, self)
- if sequential_call_count >= gslib.util.GetTermLines():
- # Output suggestion at end of long run, in case user missed it at the
- # start and it scrolled off-screen.
- self._MaybeSuggestGsutilDashM()
-
- # pylint: disable=g-doc-args
- def _ParallelApply(self, func, args_iterator, exception_handler, caller_id,
- arg_checker, process_count, thread_count,
- should_return_results, fail_on_error):
- """Dispatches input arguments across a thread/process pool.
-
- Pools are composed of parallel OS processes and/or Python threads,
- based on options (-m or not) and settings in the user's config file.
-
- If only one OS process is requested/available, dispatch requests across
- threads in the current OS process.
-
- In the multi-process case, we will create one pool of worker processes for
- each level of the tree of recursive calls to Apply. E.g., if A calls
- Apply(B), and B ultimately calls Apply(C) followed by Apply(D), then we
- will only create two sets of worker processes - B will execute in the first,
- and C and D will execute in the second. If C is then changed to call
- Apply(E) and D is changed to call Apply(F), then we will automatically
- create a third set of processes (lazily, when needed) that will be used to
- execute calls to E and F. This might look something like:
-
- Pool1 Executes: B
- / \
- Pool2 Executes: C D
- / \
- Pool3 Executes: E F
-
- Apply's parallelism is generally broken up into 4 cases:
- - If process_count == thread_count == 1, then all tasks will be executed
- by _SequentialApply.
- - If process_count > 1 and thread_count == 1, then the main thread will
- create a new pool of processes (if they don't already exist) and each of
- those processes will execute the tasks in a single thread.
- - If process_count == 1 and thread_count > 1, then this process will create
- a new pool of threads to execute the tasks.
- - If process_count > 1 and thread_count > 1, then the main thread will
- create a new pool of processes (if they don't already exist) and each of
- those processes will, upon creation, create a pool of threads to
- execute the tasks.
-
- Args:
- caller_id: The caller ID unique to this call to command.Apply.
- See command.Apply for description of other arguments.
- """
- is_main_thread = self.recursive_apply_level == 0
-
- # Catch SIGINT and SIGTERM under Linux/MacOs so we can do cleanup before
- # exiting.
- if not IS_WINDOWS and is_main_thread:
- # Register as a final signal handler because this handler kills the
- # main gsutil process (so it must run last).
- RegisterSignalHandler(signal.SIGINT, self._HandleMultiProcessingSigs,
- is_final_handler=True)
- RegisterSignalHandler(signal.SIGTERM, self._HandleMultiProcessingSigs,
- is_final_handler=True)
-
- if not task_queues:
- # The process we create will need to access the next recursive level
- # of task queues if it makes a call to Apply, so we always keep around
- # one more queue than we know we need. OTOH, if we don't create a new
- # process, the existing process still needs a task queue to use.
- task_queues.append(_NewMultiprocessingQueue())
-
- if process_count > 1: # Handle process pool creation.
- # Check whether this call will need a new set of workers.
-
- # Each worker must acquire a shared lock before notifying the main thread
- # that it needs a new worker pool, so that at most one worker asks for
- # a new worker pool at once.
- try:
- if not is_main_thread:
- worker_checking_level_lock.acquire()
- if self.recursive_apply_level >= current_max_recursive_level.value:
- with need_pool_or_done_cond:
- # Only the main thread is allowed to create new processes -
- # otherwise, we will run into some Python bugs.
- if is_main_thread:
- self._CreateNewConsumerPool(process_count, thread_count)
- else:
- # Notify the main thread that we need a new consumer pool.
- new_pool_needed.value = 1
- need_pool_or_done_cond.notify_all()
- # The main thread will notify us when it finishes.
- need_pool_or_done_cond.wait()
- finally:
- if not is_main_thread:
- worker_checking_level_lock.release()
-
- # If we're running in this process, create a separate task queue. Otherwise,
- # if Apply has already been called with process_count > 1, then there will
- # be consumer pools trying to use our processes.
- if process_count > 1:
- task_queue = task_queues[self.recursive_apply_level]
- else:
- task_queue = _NewMultiprocessingQueue()
-
- # Kick off a producer thread to throw tasks in the global task queue. We
- # do this asynchronously so that the main thread can be free to create new
- # consumer pools when needed (otherwise, any thread with a task that needs
- # a new consumer pool must block until we're completely done producing; in
- # the worst case, every worker blocks on such a call and the producer fills
- # up the task queue before it finishes, so we block forever).
- producer_thread = ProducerThread(copy.copy(self), args_iterator, caller_id,
- func, task_queue, should_return_results,
- exception_handler, arg_checker,
- fail_on_error)
-
- if process_count > 1:
- # Wait here until either:
- # 1. We're the main thread and someone needs a new consumer pool - in
- # which case we create one and continue waiting.
- # 2. Someone notifies us that all of the work we requested is done, in
- # which case we retrieve the results (if applicable) and stop
- # waiting.
- while True:
- with need_pool_or_done_cond:
- # Either our call is done, or someone needs a new level of consumer
- # pools, or we the wakeup call was meant for someone else. It's
- # impossible for both conditions to be true, since the main thread is
- # blocked on any other ongoing calls to Apply, and a thread would not
- # ask for a new consumer pool unless it had more work to do.
- if call_completed_map[caller_id]:
- break
- elif is_main_thread and new_pool_needed.value:
- new_pool_needed.value = 0
- self._CreateNewConsumerPool(process_count, thread_count)
- need_pool_or_done_cond.notify_all()
-
- # Note that we must check the above conditions before the wait() call;
- # otherwise, the notification can happen before we start waiting, in
- # which case we'll block forever.
- need_pool_or_done_cond.wait()
- else: # Using a single process.
- self._ApplyThreads(thread_count, process_count,
- self.recursive_apply_level,
- is_blocking_call=True, task_queue=task_queue)
-
- # We encountered an exception from the producer thread before any arguments
- # were enqueued, but it wouldn't have been propagated, so we'll now
- # explicitly raise it here.
- if producer_thread.unknown_exception:
- # pylint: disable=raising-bad-type
- raise producer_thread.unknown_exception
-
- # We encountered an exception from the producer thread while iterating over
- # the arguments, so raise it here if we're meant to fail on error.
- if producer_thread.iterator_exception and fail_on_error:
- # pylint: disable=raising-bad-type
- raise producer_thread.iterator_exception
-
- def _ApplyThreads(self, thread_count, process_count, recursive_apply_level,
- is_blocking_call=False, task_queue=None):
- """Assigns the work from the multi-process global task queue.
-
- Work is assigned to an individual process for later consumption either by
- the WorkerThreads or (if thread_count == 1) this thread.
-
- Args:
- thread_count: The number of threads used to perform the work. If 1, then
- perform all work in this thread.
- process_count: The number of processes used to perform the work.
- recursive_apply_level: The depth in the tree of recursive calls to Apply
- of this thread.
- is_blocking_call: True iff the call to Apply is blocked on this call
- (which is true iff process_count == 1), implying that
- _ApplyThreads must behave as a blocking call.
- """
- self._ResetConnectionPool()
- self.recursive_apply_level = recursive_apply_level
-
- task_queue = task_queue or task_queues[recursive_apply_level]
-
- assert thread_count * process_count > 1, (
- 'Invalid state, calling command._ApplyThreads with only one thread '
- 'and process.')
- worker_pool = WorkerPool(
- thread_count, self.logger,
- bucket_storage_uri_class=self.bucket_storage_uri_class,
- gsutil_api_map=self.gsutil_api_map, debug=self.debug)
-
- num_enqueued = 0
- while True:
- task = task_queue.get()
- if task.args != ZERO_TASKS_TO_DO_ARGUMENT:
- # If we have no tasks to do and we're performing a blocking call, we
- # need a special signal to tell us to stop - otherwise, we block on
- # the call to task_queue.get() forever.
- worker_pool.AddTask(task)
- num_enqueued += 1
-
- if is_blocking_call:
- num_to_do = total_tasks[task.caller_id]
- # The producer thread won't enqueue the last task until after it has
- # updated total_tasks[caller_id], so we know that num_to_do < 0 implies
- # we will do this check again.
- if num_to_do >= 0 and num_enqueued == num_to_do:
- if thread_count == 1:
- return
- else:
- while True:
- with need_pool_or_done_cond:
- if call_completed_map[task.caller_id]:
- # We need to check this first, in case the condition was
- # notified before we grabbed the lock.
- return
- need_pool_or_done_cond.wait()
-
-
-# Below here lie classes and functions related to controlling the flow of tasks
-# between various threads and processes.
-
-
-class _ConsumerPool(object):
-
- def __init__(self, processes, task_queue):
- self.processes = processes
- self.task_queue = task_queue
-
- def ShutDown(self):
- for process in self.processes:
- KillProcess(process.pid)
-
-
-def KillProcess(pid):
- """Make best effort to kill the given process.
-
- We ignore all exceptions so a caller looping through a list of processes will
- continue attempting to kill each, even if one encounters a problem.
-
- Args:
- pid: The process ID.
- """
- try:
- # os.kill doesn't work in 2.X or 3.Y on Windows for any X < 7 or Y < 2.
- if IS_WINDOWS and ((2, 6) <= sys.version_info[:3] < (2, 7) or
- (3, 0) <= sys.version_info[:3] < (3, 2)):
- kernel32 = ctypes.windll.kernel32
- handle = kernel32.OpenProcess(1, 0, pid)
- kernel32.TerminateProcess(handle, 0)
- else:
- os.kill(pid, signal.SIGKILL)
- except: # pylint: disable=bare-except
- pass
-
-
-class Task(namedtuple('Task', (
- 'func args caller_id exception_handler should_return_results arg_checker '
- 'fail_on_error'))):
- """Task class representing work to be completed.
-
- Args:
- func: The function to be executed.
- args: The arguments to func.
- caller_id: The globally-unique caller ID corresponding to the Apply call.
- exception_handler: The exception handler to use if the call to func fails.
- should_return_results: True iff the results of this function should be
- returned from the Apply call.
- arg_checker: Used to determine whether we should process the current
- argument or simply skip it. Also handles any logging that
- is specific to a particular type of argument.
- fail_on_error: If true, then raise any exceptions encountered when
- executing func. This is only applicable in the case of
- process_count == thread_count == 1.
- """
- pass
-
-
-class ProducerThread(threading.Thread):
- """Thread used to enqueue work for other processes and threads."""
-
- def __init__(self, cls, args_iterator, caller_id, func, task_queue,
- should_return_results, exception_handler, arg_checker,
- fail_on_error):
- """Initializes the producer thread.
-
- Args:
- cls: Instance of Command for which this ProducerThread was created.
- args_iterator: Iterable collection of arguments to be put into the
- work queue.
- caller_id: Globally-unique caller ID corresponding to this call to Apply.
- func: The function to be called on each element of args_iterator.
- task_queue: The queue into which tasks will be put, to later be consumed
- by Command._ApplyThreads.
- should_return_results: True iff the results for this call to command.Apply
- were requested.
- exception_handler: The exception handler to use when errors are
- encountered during calls to func.
- arg_checker: Used to determine whether we should process the current
- argument or simply skip it. Also handles any logging that
- is specific to a particular type of argument.
- fail_on_error: If true, then raise any exceptions encountered when
- executing func. This is only applicable in the case of
- process_count == thread_count == 1.
- """
- super(ProducerThread, self).__init__()
- self.func = func
- self.cls = cls
- self.args_iterator = args_iterator
- self.caller_id = caller_id
- self.task_queue = task_queue
- self.arg_checker = arg_checker
- self.exception_handler = exception_handler
- self.should_return_results = should_return_results
- self.fail_on_error = fail_on_error
- self.shared_variables_updater = _SharedVariablesUpdater()
- self.daemon = True
- self.unknown_exception = None
- self.iterator_exception = None
- self.start()
-
- def run(self):
- num_tasks = 0
- cur_task = None
- last_task = None
- try:
- args_iterator = iter(self.args_iterator)
- while True:
- try:
- args = args_iterator.next()
- except StopIteration, e:
- break
- except Exception, e: # pylint: disable=broad-except
- _IncrementFailureCount()
- if self.fail_on_error:
- self.iterator_exception = e
- raise
- else:
- try:
- self.exception_handler(self.cls, e)
- except Exception, _: # pylint: disable=broad-except
- self.cls.logger.debug(
- 'Caught exception while handling exception for %s:\n%s',
- self.func, traceback.format_exc())
- self.shared_variables_updater.Update(self.caller_id, self.cls)
- continue
-
- if self.arg_checker(self.cls, args):
- num_tasks += 1
- last_task = cur_task
- cur_task = Task(self.func, args, self.caller_id,
- self.exception_handler, self.should_return_results,
- self.arg_checker, self.fail_on_error)
- if last_task:
- self.task_queue.put(last_task)
- except Exception, e: # pylint: disable=broad-except
- # This will also catch any exception raised due to an error in the
- # iterator when fail_on_error is set, so check that we failed for some
- # other reason before claiming that we had an unknown exception.
- if not self.iterator_exception:
- self.unknown_exception = e
- finally:
- # We need to make sure to update total_tasks[caller_id] before we enqueue
- # the last task. Otherwise, a worker can retrieve the last task and
- # complete it, then check total_tasks and determine that we're not done
- # producing all before we update total_tasks. This approach forces workers
- # to wait on the last task until after we've updated total_tasks.
- total_tasks[self.caller_id] = num_tasks
- if not cur_task:
- # This happens if there were zero arguments to be put in the queue.
- cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id,
- None, None, None, None)
- self.task_queue.put(cur_task)
-
- # It's possible that the workers finished before we updated total_tasks,
- # so we need to check here as well.
- _NotifyIfDone(self.caller_id,
- caller_id_finished_count.Get(self.caller_id))
-
-
-class WorkerPool(object):
- """Pool of worker threads to which tasks can be added."""
-
- def __init__(self, thread_count, logger, bucket_storage_uri_class=None,
- gsutil_api_map=None, debug=0):
- self.task_queue = _NewThreadsafeQueue()
- self.threads = []
- for _ in range(thread_count):
- worker_thread = WorkerThread(
- self.task_queue, logger,
- bucket_storage_uri_class=bucket_storage_uri_class,
- gsutil_api_map=gsutil_api_map, debug=debug)
- self.threads.append(worker_thread)
- worker_thread.start()
-
- def AddTask(self, task):
- self.task_queue.put(task)
-
-
-class WorkerThread(threading.Thread):
- """Thread where all the work will be performed.
-
- This makes the function calls for Apply and takes care of all error handling,
- return value propagation, and shared_vars.
-
- Note that this thread is NOT started upon instantiation because the function-
- calling logic is also used in the single-threaded case.
- """
-
- def __init__(self, task_queue, logger, bucket_storage_uri_class=None,
- gsutil_api_map=None, debug=0):
- """Initializes the worker thread.
-
- Args:
- task_queue: The thread-safe queue from which this thread should obtain
- its work.
- logger: Logger to use for this thread.
- bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
- Settable for testing/mocking.
- gsutil_api_map: Map of providers and API selector tuples to api classes
- which can be used to communicate with those providers.
- Used for the instantiating CloudApiDelegator class.
- debug: debug level for the CloudApiDelegator class.
- """
- super(WorkerThread, self).__init__()
- self.task_queue = task_queue
- self.daemon = True
- self.cached_classes = {}
- self.shared_vars_updater = _SharedVariablesUpdater()
-
- self.thread_gsutil_api = None
- if bucket_storage_uri_class and gsutil_api_map:
- self.thread_gsutil_api = CloudApiDelegator(
- bucket_storage_uri_class, gsutil_api_map, logger, debug=debug)
-
- def PerformTask(self, task, cls):
- """Makes the function call for a task.
-
- Args:
- task: The Task to perform.
- cls: The instance of a class which gives context to the functions called
- by the Task's function. E.g., see SetAclFuncWrapper.
- """
- caller_id = task.caller_id
- try:
- results = task.func(cls, task.args, thread_state=self.thread_gsutil_api)
- if task.should_return_results:
- global_return_values_map.Update(caller_id, [results], default_value=[])
- except Exception, e: # pylint: disable=broad-except
- _IncrementFailureCount()
- if task.fail_on_error:
- raise # Only happens for single thread and process case.
- else:
- try:
- task.exception_handler(cls, e)
- except Exception, _: # pylint: disable=broad-except
- # Don't allow callers to raise exceptions here and kill the worker
- # threads.
- cls.logger.debug(
- 'Caught exception while handling exception for %s:\n%s',
- task, traceback.format_exc())
- finally:
- self.shared_vars_updater.Update(caller_id, cls)
-
- # Even if we encounter an exception, we still need to claim that that
- # the function finished executing. Otherwise, we won't know when to
- # stop waiting and return results.
- num_done = caller_id_finished_count.Update(caller_id, 1)
-
- if cls.multiprocessing_is_available:
- _NotifyIfDone(caller_id, num_done)
-
- def run(self):
- while True:
- task = self.task_queue.get()
- caller_id = task.caller_id
-
- # Get the instance of the command with the appropriate context.
- cls = self.cached_classes.get(caller_id, None)
- if not cls:
- cls = copy.copy(class_map[caller_id])
- cls.logger = CreateGsutilLogger(cls.command_name)
- self.cached_classes[caller_id] = cls
-
- self.PerformTask(task, cls)
-
-
-class _SharedVariablesUpdater(object):
- """Used to update shared variable for a class in the global map.
-
- Note that each thread will have its own instance of the calling class for
- context, and it will also have its own instance of a
- _SharedVariablesUpdater. This is used in the following way:
-
- 1. Before any tasks are performed, each thread will get a copy of the
- calling class, and the globally-consistent value of this shared variable
- will be initialized to whatever it was before the call to Apply began.
-
- 2. After each time a thread performs a task, it will look at the current
- values of the shared variables in its instance of the calling class.
-
- 2.A. For each such variable, it computes the delta of this variable
- between the last known value for this class (which is stored in
- a dict local to this class) and the current value of the variable
- in the class.
-
- 2.B. Using this delta, we update the last known value locally as well
- as the globally-consistent value shared across all classes (the
- globally consistent value is simply increased by the computed
- delta).
- """
-
- def __init__(self):
- self.last_shared_var_values = {}
-
- def Update(self, caller_id, cls):
- """Update any shared variables with their deltas."""
- shared_vars = shared_vars_list_map.get(caller_id, None)
- if shared_vars:
- for name in shared_vars:
- key = (caller_id, name)
- last_value = self.last_shared_var_values.get(key, 0)
- # Compute the change made since the last time we updated here. This is
- # calculated by simply subtracting the last known value from the current
- # value in the class instance.
- delta = getattr(cls, name) - last_value
- self.last_shared_var_values[key] = delta + last_value
-
- # Update the globally-consistent value by simply increasing it by the
- # computed delta.
- shared_vars_map.Update(key, delta)
-
-
-def _NotifyIfDone(caller_id, num_done):
- """Notify any threads waiting for results that something has finished.
-
- Each waiting thread will then need to check the call_completed_map to see if
- its work is done.
-
- Note that num_done could be calculated here, but it is passed in as an
- optimization so that we have one less call to a globally-locked data
- structure.
-
- Args:
- caller_id: The caller_id of the function whose progress we're checking.
- num_done: The number of tasks currently completed for that caller_id.
- """
- num_to_do = total_tasks[caller_id]
- if num_to_do == num_done and num_to_do >= 0:
- # Notify the Apply call that's sleeping that it's ready to return.
- with need_pool_or_done_cond:
- call_completed_map[caller_id] = True
- need_pool_or_done_cond.notify_all()
-
-
-def ShutDownGsutil():
- """Shut down all processes in consumer pools in preparation for exiting."""
- for q in queues:
- try:
- q.cancel_join_thread()
- except: # pylint: disable=bare-except
- pass
- for consumer_pool in consumer_pools:
- consumer_pool.ShutDown()
-
-
-# pylint: disable=global-variable-undefined
-def _IncrementFailureCount():
- global failure_count
- if isinstance(failure_count, int):
- failure_count += 1
- else: # Otherwise it's a multiprocessing.Value() of type 'i'.
- failure_count.value += 1
-
-
-# pylint: disable=global-variable-undefined
-def GetFailureCount():
- """Returns the number of failures processed during calls to Apply()."""
- try:
- if isinstance(failure_count, int):
- return failure_count
- else: # It's a multiprocessing.Value() of type 'i'.
- return failure_count.value
- except NameError: # If it wasn't initialized, Apply() wasn't called.
- return 0
-
-
-def ResetFailureCount():
- """Resets the failure_count variable to 0 - useful if error is expected."""
- try:
- global failure_count
- if isinstance(failure_count, int):
- failure_count = 0
- else: # It's a multiprocessing.Value() of type 'i'.
- failure_count = multiprocessing.Value('i', 0)
- except NameError: # If it wasn't initialized, Apply() wasn't called.
- pass

Powered by Google App Engine
This is Rietveld 408576698