Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5)

Unified Diff: gslib/command.py

Issue 698893003: Update checked in version of gsutil to version 4.6 (Closed) Base URL: http://dart.googlecode.com/svn/third_party/gsutil/
Patch Set: Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « gslib/cloud_api_helper.py ('k') | gslib/command_runner.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: gslib/command.py
===================================================================
--- gslib/command.py (revision 33376)
+++ gslib/command.py (working copy)
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
# Copyright 2010 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -11,70 +12,83 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
"""Base class for gsutil commands.
In addition to base class code, this file contains helpers that depend on base
-class state (such as GetAclCommandHelper) In general, functions that depend on
+class state (such as GetAndPrintAcl) In general, functions that depend on
class state and that are used by multiple commands belong in this file.
Functions that don't depend on class state belong in util.py, and non-shared
helpers belong in individual subclasses.
"""
-import boto
+from __future__ import absolute_import
+
import codecs
+from collections import namedtuple
import copy
import getopt
-import gslib
+from getopt import GetoptError
import logging
import multiprocessing
import os
import Queue
-import re
import signal
import sys
import textwrap
import threading
import traceback
-import wildcard_iterator
-import xml.dom.minidom
-from boto import handler
-from boto.exception import GSResponseError
+import boto
from boto.storage_uri import StorageUri
-from collections import namedtuple
-from getopt import GetoptError
-from gslib import util
+import gslib
+from gslib.cloud_api import AccessDeniedException
+from gslib.cloud_api import ArgumentException
+from gslib.cloud_api import ServiceException
+from gslib.cloud_api_delegator import CloudApiDelegator
+from gslib.cs_api_map import ApiSelector
+from gslib.cs_api_map import GsutilApiMapFactory
from gslib.exception import CommandException
from gslib.help_provider import HelpProvider
from gslib.name_expansion import NameExpansionIterator
-from gslib.name_expansion import NameExpansionIteratorQueue
+from gslib.name_expansion import NameExpansionResult
from gslib.parallelism_framework_util import AtomicIncrementDict
from gslib.parallelism_framework_util import BasicIncrementDict
from gslib.parallelism_framework_util import ThreadAndProcessSafeDict
-from gslib.project_id import ProjectIdHandler
-from gslib.storage_uri_builder import StorageUriBuilder
+from gslib.plurality_checkable_iterator import PluralityCheckableIterator
+from gslib.storage_url import StorageUrlFromString
+from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
+from gslib.translation_helper import AclTranslation
from gslib.util import GetConfigFilePath
+from gslib.util import HaveFileUrls
+from gslib.util import HaveProviderUrls
from gslib.util import IS_WINDOWS
from gslib.util import MultiprocessingIsAvailable
from gslib.util import NO_MAX
-from gslib.wildcard_iterator import ContainsWildcard
-from oauth2client.client import HAS_CRYPTO
+from gslib.util import UrlsAreForSingleProvider
+from gslib.util import UTF8
+from gslib.wildcard_iterator import CreateWildcardIterator
+OFFER_GSUTIL_M_SUGGESTION_THRESHOLD = 5
+
if IS_WINDOWS:
- import ctypes
+ import ctypes # pylint: disable=g-import-not-at-top
def _DefaultExceptionHandler(cls, e):
cls.logger.exception(e)
+
def CreateGsutilLogger(command_name):
- """Creates a logger that resembles 'print' output, but is thread safe and
- abides by gsutil -d/-D/-DD/-q options.
+ """Creates a logger that resembles 'print' output.
+ This logger abides by gsutil -d/-D/-DD/-q options.
+
By default (if none of the above options is specified) the logger will display
all messages logged with level INFO or above. Log propagation is disabled.
+ Args:
+ command_name: Command name to create logger for.
+
Returns:
A logger object.
"""
@@ -89,25 +103,26 @@
log.addHandler(log_handler)
return log
-def _UriArgChecker(command_instance, uri):
- exp_src_uri = command_instance.suri_builder.StorageUri(
- uri.GetExpandedUriStr())
- command_instance.logger.debug('process %d is handling uri %s',
- os.getpid(), exp_src_uri)
- if (command_instance.exclude_symlinks and exp_src_uri.is_file_uri()
- and os.path.islink(exp_src_uri.object_name)):
- command_instance.logger.info('Skipping symbolic link %s...', exp_src_uri)
+
+def _UrlArgChecker(command_instance, url):
+ if not command_instance.exclude_symlinks:
+ return True
+ exp_src_url = url.expanded_storage_url
+ if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name):
+ command_instance.logger.info('Skipping symbolic link %s...', exp_src_url)
return False
return True
-def DummyArgChecker(command_instance, arg):
+def DummyArgChecker(*unused_args):
return True
-def _SetAclFuncWrapper(cls, name_expansion_result):
- return cls._SetAclFunc(name_expansion_result)
-def _SetAclExceptionHandler(cls, e):
+def SetAclFuncWrapper(cls, name_expansion_result, thread_state=None):
+ return cls.SetAclFunc(name_expansion_result, thread_state=thread_state)
+
+
+def SetAclExceptionHandler(cls, e):
"""Exception handler that maintains state about post-completion status."""
cls.logger.error(str(e))
cls.everything_set_okay = False
@@ -120,27 +135,18 @@
# However, this also lets us shut down somewhat more cleanly when interrupted.
queues = []
+
def _NewMultiprocessingQueue():
queue = multiprocessing.Queue(MAX_QUEUE_SIZE)
queues.append(queue)
return queue
+
def _NewThreadsafeQueue():
queue = Queue.Queue(MAX_QUEUE_SIZE)
queues.append(queue)
return queue
-
-# command_spec key constants.
-COMMAND_NAME = 'command_name'
-COMMAND_NAME_ALIASES = 'command_name_aliases'
-MIN_ARGS = 'min_args'
-MAX_ARGS = 'max_args'
-SUPPORTED_SUB_ARGS = 'supported_sub_args'
-FILE_URIS_OK = 'file_uri_ok'
-PROVIDER_URIS_OK = 'provider_uri_ok'
-URIS_START_ARG = 'uris_start_arg'
-
# The maximum size of a process- or thread-safe queue. Imposing this limit
# prevents us from needing to hold an arbitrary amount of data in memory.
# However, setting this number too high (e.g., >= 32768 on OS X) can cause
@@ -152,7 +158,7 @@
# causing problems with infinite recursion, and it can be increased if needed.
MAX_RECURSIVE_DEPTH = 5
-ZERO_TASKS_TO_DO_ARGUMENT = ("There were no", "tasks to do")
+ZERO_TASKS_TO_DO_ARGUMENT = ('There were no', 'tasks to do')
# Map from deprecated aliases to the current command and subcommands that
# provide the same behavior.
@@ -177,27 +183,29 @@
# Declare all of the module level variables - see
# InitializeMultiprocessingVariables for an explanation of why this is
# necessary.
+# pylint: disable=global-at-module-level
global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter
global total_tasks, call_completed_map, global_return_values_map
global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed
global current_max_recursive_level, shared_vars_map, shared_vars_list_map
-global class_map
+global class_map, worker_checking_level_lock, failure_count
def InitializeMultiprocessingVariables():
- """
- Used to initialize module-level variables that will be inherited by
- subprocesses. On Windows, a multiprocessing.Manager object should only
+ """Initializes module-level variables that will be inherited by subprocesses.
+
+ On Windows, a multiprocessing.Manager object should only
be created within an "if __name__ == '__main__':" block. This function
must be called, otherwise every command that calls Command.Apply will fail.
"""
# This list of global variables must exactly match the above list of
# declarations.
+ # pylint: disable=global-variable-undefined
global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter
global total_tasks, call_completed_map, global_return_values_map
global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed
global current_max_recursive_level, shared_vars_map, shared_vars_list_map
- global class_map
+ global class_map, worker_checking_level_lock, failure_count
manager = multiprocessing.Manager()
@@ -225,6 +233,10 @@
# that a call to Apply needs a new set of consumer processes.
need_pool_or_done_cond = manager.Condition()
+ # Lock used to prevent multiple worker processes from asking the main thread
+ # to create a new consumer pool for the same level.
+ worker_checking_level_lock = manager.Lock()
+
# Map from caller_id to the current number of completed tasks for that ID.
caller_id_finished_count = AtomicIncrementDict(manager)
@@ -242,83 +254,47 @@
# Map from caller_id to calling class.
class_map = manager.dict()
+ # Number of tasks that resulted in an exception in calls to Apply().
+ failure_count = multiprocessing.Value('i', 0)
-class Command(object):
- REQUIRED_SPEC_KEYS = [COMMAND_NAME]
- # Each subclass must define the following map, minimally including the
- # keys in REQUIRED_SPEC_KEYS; other values below will be used as defaults,
- # although for readbility subclasses should specify the complete map.
- command_spec = {
+# Each subclass of Command must define a property named 'command_spec' that is
+# an instance of the following class.
+CommandSpec = namedtuple('CommandSpec', [
# Name of command.
- COMMAND_NAME : None,
+ 'command_name',
# List of command name aliases.
- COMMAND_NAME_ALIASES : [],
+ 'command_name_aliases',
# Min number of args required by this command.
- MIN_ARGS : 0,
+ 'min_args',
# Max number of args required by this command, or NO_MAX.
- MAX_ARGS : NO_MAX,
+ 'max_args',
# Getopt-style string specifying acceptable sub args.
- SUPPORTED_SUB_ARGS : '',
- # True if file URIs are acceptable for this command.
- FILE_URIS_OK : False,
- # True if provider-only URIs are acceptable for this command.
- PROVIDER_URIS_OK : False,
- # Index in args of first URI arg.
- URIS_START_ARG : 0,
- }
- _default_command_spec = command_spec
- help_spec = HelpProvider.help_spec
- _commands_with_subcommands_and_subopts = ['acl', 'defacl', 'logging', 'web']
+ 'supported_sub_args',
+ # True if file URLs are acceptable for this command.
+ 'file_url_ok',
+ # True if provider-only URLs are acceptable for this command.
+ 'provider_url_ok',
+ # Index in args of first URL arg.
+ 'urls_start_arg',
+ # List of supported APIs
+ 'gs_api_support',
+ # Default API to use for this command
+ 'gs_default_api',
+ # Private arguments (for internal testing)
+ 'supported_private_args',
+])
- """Define an empty test specification, which derived classes must populate.
- This is a list of tuples containing the following values:
+class Command(HelpProvider):
+ """Base class for all gsutil commands."""
- step_name - mnemonic name for test, displayed when test is run
- cmd_line - shell command line to run test
- expect_ret or None - expected return code from test (None means ignore)
- (result_file, expect_file) or None - tuple of result file and expected
- file to diff for additional test
- verification beyond the return code
- (None means no diff requested)
- Notes:
+ # Each subclass must override this with an instance of CommandSpec.
+ command_spec = None
- - Setting expected_ret to None means there is no expectation and,
- hence, any returned value will pass.
+ _commands_with_subcommands_and_subopts = ['acl', 'defacl', 'logging', 'web',
+ 'notification']
- - Any occurrences of the string 'gsutil' in the cmd_line parameter
- are expanded to the full path to the gsutil command under test.
-
- - The cmd_line, result_file and expect_file parameters may
- contain the following special substrings:
-
- $Bn - converted to one of 10 unique-for-testing bucket names (n=0..9)
- $On - converted to one of 10 unique-for-testing object names (n=0..9)
- $Fn - converted to one of 10 unique-for-testing file names (n=0..9)
- $G - converted to the directory where gsutil is installed. Useful for
- referencing test data.
-
- - The generated file names are full pathnames, whereas the generated
- bucket and object names are simple relative names.
-
- - Tests with a non-None result_file and expect_file automatically
- trigger an implicit diff of the two files.
-
- - These test specifications, in combination with the conversion strings
- allow tests to be constructed parametrically. For example, here's an
- annotated subset of a test_steps for the cp command:
-
- # Copy local file to object, verify 0 return code.
- ('simple cp', 'gsutil cp $F1 gs://$B1/$O1', 0, None, None),
- # Copy uploaded object back to local file and diff vs. orig file.
- ('verify cp', 'gsutil cp gs://$B1/$O1 $F2', 0, '$F2', '$F1'),
-
- - After pattern substitution, the specs are run sequentially, in the
- order in which they appear in the test_steps list.
- """
- test_steps = []
-
# This keeps track of the recursive depth of the current call to Apply.
recursive_apply_level = 0
@@ -326,48 +302,71 @@
# of the caller_id.
sequential_caller_id = -1
+ @staticmethod
+ def CreateCommandSpec(command_name, command_name_aliases=None, min_args=0,
+ max_args=NO_MAX, supported_sub_args='',
+ file_url_ok=False, provider_url_ok=False,
+ urls_start_arg=0, gs_api_support=None,
+ gs_default_api=None, supported_private_args=None):
+ """Creates an instance of CommandSpec, with defaults."""
+ return CommandSpec(
+ command_name=command_name,
+ command_name_aliases=command_name_aliases or [],
+ min_args=min_args,
+ max_args=max_args,
+ supported_sub_args=supported_sub_args,
+ file_url_ok=file_url_ok,
+ provider_url_ok=provider_url_ok,
+ urls_start_arg=urls_start_arg,
+ gs_api_support=gs_api_support or [ApiSelector.XML],
+ gs_default_api=gs_default_api or ApiSelector.XML,
+ supported_private_args=supported_private_args)
+
# Define a convenience property for command name, since it's used many places.
def _GetDefaultCommandName(self):
- return self.command_spec[COMMAND_NAME]
+ return self.command_spec.command_name
command_name = property(_GetDefaultCommandName)
-
- def _CalculateUrisStartArg(self):
- """Calculate the index in args of the first URI arg. By default, just use
- the value from command_spec.
+
+ def _CalculateUrlsStartArg(self):
+ """Calculate the index in args of the first URL arg.
+
+ Returns:
+ Index of the first URL arg (according to the command spec).
"""
- return self.command_spec[URIS_START_ARG]
+ return self.command_spec.urls_start_arg
def _TranslateDeprecatedAliases(self, args):
- """For commands that have deprecated aliases, this will map the aliases to
- the corresponding new command and also warn the user about deprecation.
- """
+ """Map deprecated aliases to the corresponding new command, and warn."""
new_command_args = OLD_ALIAS_MAP.get(self.command_alias_used, None)
if new_command_args:
# Prepend any subcommands for the new command. The command name itself
# is not part of the args, so leave it out.
args = new_command_args[1:] + args
- self.logger.warn('\n'.join(textwrap.wrap((
- 'You are using a deprecated alias, "%(used_alias)s", for the '
- '"%(command_name)s" command. This will stop working on 9/9/2014. '
- 'Please use "%(command_name)s" with the appropriate sub-command in '
- 'the future. See "gsutil help %(command_name)s" for details.') %
+ self.logger.warn('\n'.join(textwrap.wrap(
+ ('You are using a deprecated alias, "%(used_alias)s", for the '
+ '"%(command_name)s" command. This will stop working on 9/9/2014. '
+ 'Please use "%(command_name)s" with the appropriate sub-command in '
+ 'the future. See "gsutil help %(command_name)s" for details.') %
{'used_alias': self.command_alias_used,
- 'command_name': self.command_name })))
+ 'command_name': self.command_name})))
return args
def __init__(self, command_runner, args, headers, debug, parallel_operations,
- config_file_list, bucket_storage_uri_class, test_method=None,
- logging_filters=None, command_alias_used=None):
- """
+ bucket_storage_uri_class, gsutil_api_class_map_factory,
+ test_method=None, logging_filters=None,
+ command_alias_used=None):
+ """Instantiates a Command.
+
Args:
command_runner: CommandRunner (for commands built atop other commands).
args: Command-line args (arg0 = actual arg, not command name ala bash).
headers: Dictionary containing optional HTTP headers to pass to boto.
debug: Debug level to pass in to boto connection (range 0..3).
parallel_operations: Should command operations be executed in parallel?
- config_file_list: Config file list returned by GetBotoConfigFileList().
bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
Settable for testing/mocking.
+ gsutil_api_class_map_factory: Creates map of cloud storage interfaces.
+ Settable for testing/mocking.
test_method: Optional general purpose method for testing purposes.
Application and semantics of this method will vary by
command and test type.
@@ -389,8 +388,8 @@
self.headers = headers
self.debug = debug
self.parallel_operations = parallel_operations
- self.config_file_list = config_file_list
self.bucket_storage_uri_class = bucket_storage_uri_class
+ self.gsutil_api_class_map_factory = gsutil_api_class_map_factory
self.test_method = test_method
self.exclude_symlinks = False
self.recursion_requested = False
@@ -400,61 +399,59 @@
# Global instance of a threaded logger object.
self.logger = CreateGsutilLogger(self.command_name)
if logging_filters:
- for filter in logging_filters:
- self.logger.addFilter(filter)
+ for log_filter in logging_filters:
+ self.logger.addFilter(log_filter)
- # Process sub-command instance specifications.
- # First, ensure subclass implementation sets all required keys.
- for k in self.REQUIRED_SPEC_KEYS:
- if k not in self.command_spec or self.command_spec[k] is None:
- raise CommandException('"%s" command implementation is missing %s '
- 'specification' % (self.command_name, k))
- # Now override default command_spec with subclass-specified values.
- tmp = self._default_command_spec
- tmp.update(self.command_spec)
- self.command_spec = tmp
- del tmp
+ if self.command_spec is None:
+ raise CommandException('"%s" command implementation is missing a '
+ 'command_spec definition.' % self.command_name)
- # Make sure command provides a test specification.
- if not self.test_steps:
- # TODO: Uncomment following lines when test feature is ready.
- #raise CommandException('"%s" command implementation is missing test '
- #'specification' % self.command_name)
- pass
-
# Parse and validate args.
args = self._TranslateDeprecatedAliases(args)
try:
(self.sub_opts, self.args) = getopt.getopt(
- args, self.command_spec[SUPPORTED_SUB_ARGS])
+ args, self.command_spec.supported_sub_args,
+ self.command_spec.supported_private_args or [])
except GetoptError, e:
raise CommandException('%s for "%s" command.' % (e.msg,
self.command_name))
- self.command_spec[URIS_START_ARG] = self._CalculateUrisStartArg()
-
- if (len(self.args) < self.command_spec[MIN_ARGS]
- or len(self.args) > self.command_spec[MAX_ARGS]):
+ # Named tuple public functions start with _
+ # pylint: disable=protected-access
+ self.command_spec = self.command_spec._replace(
+ urls_start_arg=self._CalculateUrlsStartArg())
+
+ if (len(self.args) < self.command_spec.min_args
+ or len(self.args) > self.command_spec.max_args):
self._RaiseWrongNumberOfArgumentsException()
- if not (self.command_name in
- self._commands_with_subcommands_and_subopts):
+ if self.command_name not in self._commands_with_subcommands_and_subopts:
self.CheckArguments()
-
- self.proj_id_handler = ProjectIdHandler()
- self.suri_builder = StorageUriBuilder(debug, bucket_storage_uri_class)
+ # Build the support and default maps from the command spec.
+ support_map = {
+ 'gs': self.command_spec.gs_api_support,
+ 's3': [ApiSelector.XML]
+ }
+ default_map = {
+ 'gs': self.command_spec.gs_default_api,
+ 's3': ApiSelector.XML
+ }
+ self.gsutil_api_map = GsutilApiMapFactory.GetApiMap(
+ self.gsutil_api_class_map_factory, support_map, default_map)
+
+ self.project_id = None
+ self.gsutil_api = CloudApiDelegator(
+ bucket_storage_uri_class, self.gsutil_api_map,
+ self.logger, debug=self.debug)
+
# Cross-platform path to run gsutil binary.
self.gsutil_cmd = ''
- # Cross-platform list containing gsutil path for use with subprocess.
- self.gsutil_exec_list = []
# If running on Windows, invoke python interpreter explicitly.
if gslib.util.IS_WINDOWS:
self.gsutil_cmd += 'python '
- self.gsutil_exec_list += ['python']
# Add full path to gsutil to make sure we test the correct version.
self.gsutil_path = gslib.GSUTIL_PATH
self.gsutil_cmd += self.gsutil_path
- self.gsutil_exec_list += [self.gsutil_path]
# We're treating recursion_requested like it's used by all commands, but
# only some of the commands accept the -R option.
@@ -467,58 +464,63 @@
self.multiprocessing_is_available = MultiprocessingIsAvailable()[0]
def _RaiseWrongNumberOfArgumentsException(self):
- """Raise an exception indicating that the wrong number of arguments was
- provided for this command.
- """
- if len(self.args) > self.command_spec[MAX_ARGS]:
+ """Raises exception for wrong number of arguments supplied to command."""
+ if len(self.args) > self.command_spec.max_args:
message = ('The %s command accepts at most %d arguments.' %
- (self.command_name, self.command_spec[MAX_ARGS]))
- elif len(self.args) < self.command_spec[MIN_ARGS]:
+ (self.command_name, self.command_spec.max_args))
+ elif len(self.args) < self.command_spec.min_args:
message = ('The %s command requires at least %d arguments.' %
- (self.command_name, self.command_spec[MIN_ARGS]))
+ (self.command_name, self.command_spec.min_args))
raise CommandException(message)
def CheckArguments(self):
- """Checks that the arguments provided on the command line fit the
- expectations of the command_spec. Any commands in
- self._commands_with_subcommands_and_subopts are responsible for calling
- this method after handling initial parsing of their arguments.
- This prevents commands with sub-commands as well as options from breaking
- the parsing of getopt.
+ """Checks that command line arguments match the command_spec.
- TODO: Provide a function to parse commands and sub-commands more
- intelligently once we stop allowing the deprecated command versions.
+ Any commands in self._commands_with_subcommands_and_subopts are responsible
+ for calling this method after handling initial parsing of their arguments.
+ This prevents commands with sub-commands as well as options from breaking
+ the parsing of getopt.
+
+ TODO: Provide a function to parse commands and sub-commands more
+ intelligently once we stop allowing the deprecated command versions.
+
+ Raises:
+ CommandException if the arguments don't match.
"""
- if (not self.command_spec[FILE_URIS_OK]
- and self.HaveFileUris(self.args[self.command_spec[URIS_START_ARG]:])):
- raise CommandException('"%s" command does not support "file://" URIs. '
- 'Did you mean to use a gs:// URI?' %
+ if (not self.command_spec.file_url_ok
+ and HaveFileUrls(self.args[self.command_spec.urls_start_arg:])):
+ raise CommandException('"%s" command does not support "file://" URLs. '
+ 'Did you mean to use a gs:// URL?' %
self.command_name)
- if (not self.command_spec[PROVIDER_URIS_OK]
- and self._HaveProviderUris(
- self.args[self.command_spec[URIS_START_ARG]:])):
+ if (not self.command_spec.provider_url_ok
+ and HaveProviderUrls(self.args[self.command_spec.urls_start_arg:])):
raise CommandException('"%s" command does not support provider-only '
- 'URIs.' % self.command_name)
+ 'URLs.' % self.command_name)
- def WildcardIterator(self, uri_or_str, all_versions=False):
- """
- Helper to instantiate gslib.WildcardIterator. Args are same as
- gslib.WildcardIterator interface, but this method fills in most of the
- values from instance state.
+ def WildcardIterator(self, url_string, all_versions=False):
+ """Helper to instantiate gslib.WildcardIterator.
+ Args are same as gslib.WildcardIterator interface, but this method fills in
+ most of the values from instance state.
+
Args:
- uri_or_str: StorageUri or URI string naming wildcard objects to iterate.
+ url_string: URL string naming wildcard objects to iterate.
+ all_versions: If true, the iterator yields all versions of objects
+ matching the wildcard. If false, yields just the live
+ object version.
+
+ Returns:
+ WildcardIterator for use by caller.
"""
- return wildcard_iterator.wildcard_iterator(
- uri_or_str, self.proj_id_handler,
- bucket_storage_uri_class=self.bucket_storage_uri_class,
- all_versions=all_versions,
- headers=self.headers, debug=self.debug)
+ return CreateWildcardIterator(
+ url_string, self.gsutil_api, all_versions=all_versions,
+ debug=self.debug, project_id=self.project_id)
def RunCommand(self):
- """Abstract function in base class. Subclasses must implement this. The
- return value of this function will be used as the exit status of the
+ """Abstract function in base class. Subclasses must implement this.
+
+ The return value of this function will be used as the exit status of the
process, so subclass commands should return an integer exit code (0 for
success, a value in [1,255] for failure).
"""
@@ -529,141 +531,174 @@
# Shared helper functions that depend on base class state. #
############################################################
- def UrisAreForSingleProvider(self, uri_args):
- """Tests whether the uris are all for a single provider.
+ def ApplyAclFunc(self, acl_func, acl_excep_handler, url_strs):
+ """Sets the standard or default object ACL depending on self.command_name.
- Returns: a StorageUri for one of the uris on success, None on failure.
+ Args:
+ acl_func: ACL function to be passed to Apply.
+ acl_excep_handler: ACL exception handler to be passed to Apply.
+ url_strs: URL strings on which to set ACL.
+
+ Raises:
+ CommandException if an ACL could not be set.
"""
- provider = None
- uri = None
- for uri_str in uri_args:
- # validate=False because we allow wildcard uris.
- uri = boto.storage_uri(
- uri_str, debug=self.debug, validate=False,
- bucket_storage_uri_class=self.bucket_storage_uri_class)
- if not provider:
- provider = uri.scheme
- elif uri.scheme != provider:
- return None
- return uri
-
- def _SetAclFunc(self, name_expansion_result):
- exp_src_uri = self.suri_builder.StorageUri(
- name_expansion_result.GetExpandedUriStr())
- # We don't do bucket operations multi-threaded (see comment below).
- assert self.command_name != 'defacl'
- self.logger.info('Setting ACL on %s...' %
- name_expansion_result.expanded_uri_str)
- try:
- if self.canned:
- exp_src_uri.set_acl(self.acl_arg, exp_src_uri.object_name, False,
- self.headers)
+ multi_threaded_url_args = []
+ # Handle bucket ACL setting operations single-threaded, because
+ # our threading machinery currently assumes it's working with objects
+ # (name_expansion_iterator), and normally we wouldn't expect users to need
+ # to set ACLs on huge numbers of buckets at once anyway.
+ for url_str in url_strs:
+ url = StorageUrlFromString(url_str)
+ if url.IsCloudUrl() and url.IsBucket():
+ if self.recursion_requested:
+ # If user specified -R option, convert any bucket args to bucket
+ # wildcards (e.g., gs://bucket/*), to prevent the operation from
+ # being applied to the buckets themselves.
+ url.object_name = '*'
+ multi_threaded_url_args.append(url.url_string)
+ else:
+ # Convert to a NameExpansionResult so we can re-use the threaded
+ # function for the single-threaded implementation. RefType is unused.
+ for blr in self.WildcardIterator(url.url_string).IterBuckets(
+ bucket_fields=['id']):
+ name_expansion_for_url = NameExpansionResult(
+ url, False, False, blr.storage_url)
+ acl_func(self, name_expansion_for_url)
else:
- exp_src_uri.set_xml_acl(self.acl_arg, exp_src_uri.object_name, False,
- self.headers)
- except GSResponseError as e:
- if self.continue_on_error:
- exc_name, message, detail = util.ParseErrorDetail(e)
- self.everything_set_okay = False
- sys.stderr.write(util.FormatErrorMessage(
- exc_name, e.status, e.code, e.reason, message, detail))
- else:
+ multi_threaded_url_args.append(url_str)
+
+ if len(multi_threaded_url_args) >= 1:
+ name_expansion_iterator = NameExpansionIterator(
+ self.command_name, self.debug,
+ self.logger, self.gsutil_api,
+ multi_threaded_url_args, self.recursion_requested,
+ all_versions=self.all_versions,
+ continue_on_error=self.continue_on_error or self.parallel_operations)
+
+ # Perform requests in parallel (-m) mode, if requested, using
+ # configured number of parallel processes and threads. Otherwise,
+ # perform requests with sequential function calls in current process.
+ self.Apply(acl_func, name_expansion_iterator, acl_excep_handler,
+ fail_on_error=not self.continue_on_error)
+
+ if not self.everything_set_okay and not self.continue_on_error:
+ raise CommandException('ACLs for some objects could not be set.')
+
+ def SetAclFunc(self, name_expansion_result, thread_state=None):
+ """Sets the object ACL for the name_expansion_result provided.
+
+ Args:
+ name_expansion_result: NameExpansionResult describing the target object.
+ thread_state: If present, use this gsutil Cloud API instance for the set.
+ """
+ if thread_state:
+ assert not self.def_acl
+ gsutil_api = thread_state
+ else:
+ gsutil_api = self.gsutil_api
+ op_string = 'default object ACL' if self.def_acl else 'ACL'
+ url = name_expansion_result.expanded_storage_url
+ self.logger.info('Setting %s on %s...', op_string, url)
+ if ((gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML
+ and url.scheme != 'gs') or self.canned):
+ # If we are using canned ACLs or interacting with a non-google ACL
+ # model, we need to use the XML passthrough. acl_arg should either
+ # be a canned ACL or an XML ACL.
+ try:
+ # No canned ACL support in JSON, force XML API to be used.
+ orig_prefer_api = gsutil_api.prefer_api
+ gsutil_api.prefer_api = ApiSelector.XML
+ gsutil_api.XmlPassThroughSetAcl(
+ self.acl_arg, url, canned=self.canned,
+ def_obj_acl=self.def_acl, provider=url.scheme)
+ gsutil_api.prefer_api = orig_prefer_api
+ except ServiceException as e:
+ if self.continue_on_error:
+ self.everything_set_okay = False
+ self.logger.error(e)
+ else:
+ raise
+ else: # Normal Cloud API path. ACL is a JSON ACL.
+ try:
+ if url.IsBucket():
+ if self.def_acl:
+ def_obj_acl = AclTranslation.JsonToMessage(
+ self.acl_arg, apitools_messages.ObjectAccessControl)
+ bucket_metadata = apitools_messages.Bucket(
+ defaultObjectAcl=def_obj_acl)
+ gsutil_api.PatchBucket(url.bucket_name, bucket_metadata,
+ provider=url.scheme, fields=['id'])
+ else:
+ bucket_acl = AclTranslation.JsonToMessage(
+ self.acl_arg, apitools_messages.BucketAccessControl)
+ bucket_metadata = apitools_messages.Bucket(acl=bucket_acl)
+ gsutil_api.PatchBucket(url.bucket_name, bucket_metadata,
+ provider=url.scheme, fields=['id'])
+ else: # url.IsObject()
+ object_acl = AclTranslation.JsonToMessage(
+ self.acl_arg, apitools_messages.ObjectAccessControl)
+ object_metadata = apitools_messages.Object(acl=object_acl)
+ gsutil_api.PatchObjectMetadata(url.bucket_name, url.object_name,
+ object_metadata, provider=url.scheme,
+ generation=url.generation)
+ except ArgumentException, e:
raise
+ except ServiceException, e:
+ raise
- def SetAclCommandHelper(self):
+ def SetAclCommandHelper(self, acl_func, acl_excep_handler):
+ """Sets ACLs on the self.args using the passed-in acl function.
+
+ Args:
+ acl_func: ACL function to be passed to Apply.
+ acl_excep_handler: ACL exception handler to be passed to Apply.
"""
- Common logic for setting ACLs. Sets the standard ACL or the default
- object ACL depending on self.command_name.
- """
-
- self.acl_arg = self.args[0]
- uri_args = self.args[1:]
- # Disallow multi-provider acl set requests, because there are differences in
+ acl_arg = self.args[0]
+ url_args = self.args[1:]
+ # Disallow multi-provider setacl requests, because there are differences in
# the ACL models.
- storage_uri = self.UrisAreForSingleProvider(uri_args)
- if not storage_uri:
+ if not UrlsAreForSingleProvider(url_args):
raise CommandException('"%s" command spanning providers not allowed.' %
self.command_name)
# Determine whether acl_arg names a file containing XML ACL text vs. the
# string name of a canned ACL.
- if os.path.isfile(self.acl_arg):
- with codecs.open(self.acl_arg, 'r', 'utf-8') as f:
- self.acl_arg = f.read()
+ if os.path.isfile(acl_arg):
+ with codecs.open(acl_arg, 'r', UTF8) as f:
+ acl_arg = f.read()
self.canned = False
else:
# No file exists, so expect a canned ACL string.
+ # Canned ACLs are not supported in JSON and we need to use the XML API
+ # to set them.
+ # validate=False because we allow wildcard urls.
+ storage_uri = boto.storage_uri(
+ url_args[0], debug=self.debug, validate=False,
+ bucket_storage_uri_class=self.bucket_storage_uri_class)
+
canned_acls = storage_uri.canned_acls()
- if self.acl_arg not in canned_acls:
- raise CommandException('Invalid canned ACL "%s".' % self.acl_arg)
+ if acl_arg not in canned_acls:
+ raise CommandException('Invalid canned ACL "%s".' % acl_arg)
self.canned = True
# Used to track if any ACLs failed to be set.
self.everything_set_okay = True
+ self.acl_arg = acl_arg
- # If user specified -R option, convert any bucket args to bucket wildcards
- # (e.g., gs://bucket/*), to prevent the operation from being applied to
- # the buckets themselves.
- if self.recursion_requested:
- for i in range(len(uri_args)):
- uri = self.suri_builder.StorageUri(uri_args[i])
- if uri.names_bucket():
- uri_args[i] = uri.clone_replace_name('*').uri
- else:
- # Handle bucket ACL setting operations single-threaded, because
- # our threading machinery currently assumes it's working with objects
- # (name_expansion_iterator), and normally we wouldn't expect users to need
- # to set ACLs on huge numbers of buckets at once anyway.
- for i in range(len(uri_args)):
- uri_str = uri_args[i]
- if self.suri_builder.StorageUri(uri_str).names_bucket():
- self._RunSingleThreadedSetAcl(self.acl_arg, uri_args)
- return
-
- name_expansion_iterator = NameExpansionIterator(
- self.command_name, self.proj_id_handler, self.headers, self.debug,
- self.logger, self.bucket_storage_uri_class, uri_args,
- self.recursion_requested, self.recursion_requested,
- all_versions=self.all_versions)
- # Perform requests in parallel (-m) mode, if requested, using
- # configured number of parallel processes and threads. Otherwise,
- # perform requests with sequential function calls in current process.
- self.Apply(_SetAclFuncWrapper, name_expansion_iterator,
- _SetAclExceptionHandler)
-
+ self.ApplyAclFunc(acl_func, acl_excep_handler, url_args)
if not self.everything_set_okay and not self.continue_on_error:
raise CommandException('ACLs for some objects could not be set.')
- def _RunSingleThreadedSetAcl(self, acl_arg, uri_args):
- some_matched = False
- for uri_str in uri_args:
- for blr in self.WildcardIterator(uri_str):
- if blr.HasPrefix():
- continue
- some_matched = True
- uri = blr.GetUri()
- if self.command_name == 'defacl':
- self.logger.info('Setting default object ACL on %s...', uri)
- if self.canned:
- uri.set_def_acl(acl_arg, uri.object_name, False, self.headers)
- else:
- uri.set_def_xml_acl(acl_arg, False, self.headers)
- else:
- self.logger.info('Setting ACL on %s...', uri)
- if self.canned:
- uri.set_acl(acl_arg, uri.object_name, False, self.headers)
- else:
- uri.set_xml_acl(acl_arg, uri.object_name, False, self.headers)
- if not some_matched:
- raise CommandException('No URIs matched')
-
def _WarnServiceAccounts(self):
- """Warns service account users who have received an AccessDenied error for
- one of the metadata-related commands to make sure that they are listed as
- Owners in the API console."""
+ """Warns service account users who have received an AccessDenied error.
- # Import this here so that the value will be set first in oauth2_plugin.
- from gslib.third_party.oauth2_plugin.oauth2_plugin import IS_SERVICE_ACCOUNT
+ When one of the metadata-related commands fails due to AccessDenied, user
+ must ensure that they are listed as an Owner in the API console.
+ """
+ # Import this here so that the value will be set first in
+ # gcs_oauth2_boto_plugin.
+ # pylint: disable=g-import-not-at-top
+ from gcs_oauth2_boto_plugin.oauth2_plugin import IS_SERVICE_ACCOUNT
if IS_SERVICE_ACCOUNT:
# This method is only called when canned ACLs are used, so the warning
@@ -676,58 +711,91 @@
'address is listed as an Owner in the Team tab of the API console. '
'See "gsutil help creds" for further information.\n')))
- def GetAclCommandHelper(self):
- """Common logic for getting ACLs. Gets the standard ACL or the default
- object ACL depending on self.command_name."""
+ def GetAndPrintAcl(self, url_str):
+ """Prints the standard or default object ACL depending on self.command_name.
- # Resolve to just one object.
- # Handle wildcard-less URI specially in case this is a version-specific
- # URI, because WildcardIterator().IterUris() would lose the versioning info.
- if not ContainsWildcard(self.args[0]):
- uri = self.suri_builder.StorageUri(self.args[0])
+ Args:
+ url_str: URL string to get ACL for.
+ """
+ blr = self.GetAclCommandBucketListingReference(url_str)
+ url = StorageUrlFromString(url_str)
+ if (self.gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML
+ and url.scheme != 'gs'):
+ # Need to use XML passthrough.
+ try:
+ acl = self.gsutil_api.XmlPassThroughGetAcl(
+ url, def_obj_acl=self.def_acl, provider=url.scheme)
+ print acl.to_xml()
+ except AccessDeniedException, _:
+ self._WarnServiceAccounts()
+ raise
else:
- uris = list(self.WildcardIterator(self.args[0]).IterUris())
- if len(uris) == 0:
- raise CommandException('No URIs matched')
- if len(uris) != 1:
- raise CommandException('%s matched more than one URI, which is not '
- 'allowed by the %s command' % (self.args[0], self.command_name))
- uri = uris[0]
- if not uri.names_bucket() and not uri.names_object():
- raise CommandException('"%s" command must specify a bucket or '
- 'object.' % self.command_name)
- if self.command_name == 'defacl':
- acl = uri.get_def_acl(False, self.headers)
- else:
- acl = uri.get_acl(False, self.headers)
- # Pretty-print the XML to make it more easily human editable.
- parsed_xml = xml.dom.minidom.parseString(acl.to_xml().encode('utf-8'))
- print parsed_xml.toprettyxml(indent=' ').encode('utf-8')
+ if self.command_name == 'defacl':
+ acl = blr.root_object.defaultObjectAcl
+ if not acl:
+ self.logger.warn(
+ 'No default object ACL present for %s. This could occur if '
+ 'the default object ACL is private, in which case objects '
+ 'created in this bucket will be readable only by their '
+ 'creators. It could also mean you do not have OWNER permission '
+ 'on %s and therefore do not have permission to read the '
+ 'default object ACL.', url_str, url_str)
+ else:
+ acl = blr.root_object.acl
+ if not acl:
+ self._WarnServiceAccounts()
+ raise AccessDeniedException('Access denied. Please ensure you have '
+ 'OWNER permission on %s.' % url_str)
+ print AclTranslation.JsonFromMessage(acl)
- def GetXmlSubresource(self, subresource, uri_arg):
- """Print an xml subresource, e.g. logging, for a bucket/object.
+ def GetAclCommandBucketListingReference(self, url_str):
+ """Gets a single bucket listing reference for an acl get command.
Args:
- subresource: The subresource name.
- uri_arg: URI for the bucket/object. Wildcards will be expanded.
+ url_str: URL string to get the bucket listing reference for.
+ Returns:
+ BucketListingReference for the URL string.
+
Raises:
- CommandException: if errors encountered.
+ CommandException if string did not result in exactly one reference.
"""
- # Wildcarding is allowed but must resolve to just one bucket.
- uris = list(self.WildcardIterator(uri_arg).IterUris())
- if len(uris) != 1:
- raise CommandException('Wildcards must resolve to exactly one item for '
- 'get %s' % subresource)
- uri = uris[0]
- xml_str = uri.get_subresource(subresource, False, self.headers)
- # Pretty-print the XML to make it more easily human editable.
- parsed_xml = xml.dom.minidom.parseString(xml_str.encode('utf-8'))
- print parsed_xml.toprettyxml(indent=' ')
+ # We're guaranteed by caller that we have the appropriate type of url
+ # string for the call (ex. we will never be called with an object string
+ # by getdefacl)
+ wildcard_url = StorageUrlFromString(url_str)
+ if wildcard_url.IsObject():
+ plurality_iter = PluralityCheckableIterator(
+ self.WildcardIterator(url_str).IterObjects(
+ bucket_listing_fields=['acl']))
+ else:
+ # Bucket or provider. We call IterBuckets explicitly here to ensure that
+ # the root object is populated with the acl.
+ if self.command_name == 'defacl':
+ bucket_fields = ['defaultObjectAcl']
+ else:
+ bucket_fields = ['acl']
+ plurality_iter = PluralityCheckableIterator(
+ self.WildcardIterator(url_str).IterBuckets(
+ bucket_fields=bucket_fields))
+ if plurality_iter.IsEmpty():
+ raise CommandException('No URLs matched')
+ if plurality_iter.HasPlurality():
+ raise CommandException(
+ '%s matched more than one URL, which is not allowed by the %s '
+ 'command' % (url_str, self.command_name))
+ return list(plurality_iter)[0]
- def _HandleMultiProcessingControlC(self, signal_num, cur_stack_frame):
- """Called when user hits ^C during a multi-processing/multi-threaded
- request, so we can kill the subprocesses."""
+ def _HandleMultiProcessingControlC(self, unused_signal_num,
+ unused_cur_stack_frame):
+ """Called when user hits ^C during a multi-process/multi-thread request.
+
+ Kills subprocesses.
+
+ Args:
+ unused_signal_num: signal generated by ^C.
+ unused_cur_stack_frame: Current stack frame.
+ """
# Note: This only works under Linux/MacOS. See
# https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details
# about why making it work correctly across OS's is harder and still open.
@@ -736,38 +804,57 @@
# Simply calling sys.exit(1) doesn't work - see above bug for details.
KillProcess(os.getpid())
- def HaveFileUris(self, args_to_check):
- """Checks whether args_to_check contain any file URIs.
+ def GetSingleBucketUrlFromArg(self, arg, bucket_fields=None):
+ """Gets a single bucket URL based on the command arguments.
Args:
- args_to_check: Command-line argument subset to check.
+ arg: String argument to get bucket URL for.
+ bucket_fields: Fields to populate for the bucket.
Returns:
- True if args_to_check contains any file URIs.
+ (StorageUrl referring to a single bucket, Bucket metadata).
+
+ Raises:
+ CommandException if args did not match exactly one bucket.
"""
- for uri_str in args_to_check:
- if uri_str.lower().startswith('file://') or uri_str.find(':') == -1:
- return True
- return False
+ plurality_checkable_iterator = self.GetBucketUrlIterFromArg(
+ arg, bucket_fields=bucket_fields)
+ if plurality_checkable_iterator.HasPlurality():
+ raise CommandException(
+ '%s matched more than one URL, which is not\n'
+ 'allowed by the %s command' % (arg, self.command_name))
+ blr = list(plurality_checkable_iterator)[0]
+ return StorageUrlFromString(blr.url_string), blr.root_object
- ######################
- # Private functions. #
- ######################
+ def GetBucketUrlIterFromArg(self, arg, bucket_fields=None):
+ """Gets a single bucket URL based on the command arguments.
- def _HaveProviderUris(self, args_to_check):
- """Checks whether args_to_check contains any provider URIs (like 'gs://').
-
Args:
- args_to_check: Command-line argument subset to check.
+ arg: String argument to iterate over.
+ bucket_fields: Fields to populate for the bucket.
Returns:
- True if args_to_check contains any provider URIs.
+ PluralityCheckableIterator over buckets.
+
+ Raises:
+ CommandException if iterator matched no buckets.
"""
- for uri_str in args_to_check:
- if re.match('^[a-z]+://$', uri_str):
- return True
- return False
-
+ arg_url = StorageUrlFromString(arg)
+ if not arg_url.IsCloudUrl() or arg_url.IsObject():
+ raise CommandException('"%s" command must specify a bucket' %
+ self.command_name)
+
+ plurality_checkable_iterator = PluralityCheckableIterator(
+ self.WildcardIterator(arg).IterBuckets(
+ bucket_fields=bucket_fields))
+ if plurality_checkable_iterator.IsEmpty():
+ raise CommandException('No URLs matched')
+ return plurality_checkable_iterator
+
+ ######################
+ # Private functions. #
+ ######################
+
def _ResetConnectionPool(self):
# Each OS process needs to establish its own set of connections to
# the server to avoid writes from different OS processes interleaving
@@ -781,9 +868,10 @@
def _GetProcessAndThreadCount(self, process_count, thread_count,
parallel_operations_override):
- """
- Determines the values of process_count and thread_count that we should
- actually use. If we're not performing operations in parallel, then ignore
+ """Determines the values of process_count and thread_count.
+
+ These values are used for parallel operations.
+ If we're not performing operations in parallel, then ignore
existing values and use process_count = thread_count = 1.
Args:
@@ -822,23 +910,23 @@
thread_count = 1
if IS_WINDOWS and process_count > 1:
- raise CommandException('\n'.join(textwrap.wrap((
- 'It is not possible to set process_count > 1 on Windows. Please '
- 'update your config file (located at %s) and set '
- '"process_count = 1".') %
+ raise CommandException('\n'.join(textwrap.wrap(
+ ('It is not possible to set process_count > 1 on Windows. Please '
+ 'update your config file (located at %s) and set '
+ '"parallel_process_count = 1".') %
GetConfigFilePath())))
self.logger.debug('process count: %d', process_count)
self.logger.debug('thread count: %d', thread_count)
-
+
return (process_count, thread_count)
def _SetUpPerCallerState(self):
"""Set up the state for a caller id, corresponding to one Apply call."""
# Get a new caller ID.
with caller_id_lock:
- caller_id_counter.value = caller_id_counter.value + 1
+ caller_id_counter.value += 1
caller_id = caller_id_counter.value
-
+
# Create a copy of self with an incremented recursive level. This allows
# the class to report its level correctly if the function called from it
# also needs to call Apply.
@@ -849,12 +937,16 @@
# recreate it later in the WorkerThread. This is not a problem since any
# logger with the same name will be treated as a singleton.
cls.logger = None
+
+ # Likewise, the default API connection can't be pickled, but it is unused
+ # anyway as each thread gets its own API delegator.
+ cls.gsutil_api = None
+
class_map[caller_id] = cls
-
total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet.
call_completed_map[caller_id] = False
- caller_id_finished_count.put(caller_id, 0)
- global_return_values_map.put(caller_id, [])
+ caller_id_finished_count.Put(caller_id, 0)
+ global_return_values_map.Put(caller_id, [])
return caller_id
def _CreateNewConsumerPool(self, num_processes, num_threads):
@@ -863,14 +955,14 @@
task_queue = _NewMultiprocessingQueue()
task_queues.append(task_queue)
- current_max_recursive_level.value = current_max_recursive_level.value + 1
+ current_max_recursive_level.value += 1
if current_max_recursive_level.value > MAX_RECURSIVE_DEPTH:
raise CommandException('Recursion depth of Apply calls is too great.')
- for shard in range(num_processes):
+ for _ in range(num_processes):
recursive_apply_level = len(consumer_pools)
p = multiprocessing.Process(
target=self._ApplyThreads,
- args=(num_threads, recursive_apply_level, shard))
+ args=(num_threads, num_processes, recursive_apply_level))
p.daemon = True
processes.append(p)
p.start()
@@ -878,14 +970,11 @@
consumer_pools.append(consumer_pool)
def Apply(self, func, args_iterator, exception_handler,
- shared_attrs=None, arg_checker=_UriArgChecker,
+ shared_attrs=None, arg_checker=_UrlArgChecker,
parallel_operations_override=False, process_count=None,
thread_count=None, should_return_results=False,
fail_on_error=False):
- """
- Determines whether the necessary parts of the multiprocessing module are
- available, and delegates to _ParallelApply or _SequentialApply as
- appropriate.
+ """Calls _Parallel/SequentialApply based on multiprocessing availability.
Args:
func: Function to call to process each argument.
@@ -908,6 +997,9 @@
fail_on_error: If true, then raise any exceptions encountered when
executing func. This is only applicable in the case of
process_count == thread_count == 1.
+
+ Returns:
+ Results from spawned threads.
"""
if shared_attrs:
original_shared_vars_values = {} # We'll add these back in at the end.
@@ -919,13 +1011,14 @@
(process_count, thread_count) = self._GetProcessAndThreadCount(
process_count, thread_count, parallel_operations_override)
+
is_main_thread = (self.recursive_apply_level == 0
and self.sequential_caller_id == -1)
# We don't honor the fail_on_error flag in the case of multiple threads
# or processes.
fail_on_error = fail_on_error and (process_count * thread_count == 1)
-
+
# Only check this from the first call in the main thread. Apart from the
# fact that it's wasteful to try this multiple times in general, it also
# will never work when called from a subprocess since we use daemon
@@ -945,31 +1038,31 @@
else:
self.sequential_caller_id += 1
caller_id = self.sequential_caller_id
-
+
if is_main_thread:
- global global_return_values_map, shared_vars_map
+ # pylint: disable=global-variable-undefined
+ global global_return_values_map, shared_vars_map, failure_count
global caller_id_finished_count, shared_vars_list_map
global_return_values_map = BasicIncrementDict()
- global_return_values_map.put(caller_id, [])
+ global_return_values_map.Put(caller_id, [])
shared_vars_map = BasicIncrementDict()
caller_id_finished_count = BasicIncrementDict()
shared_vars_list_map = {}
+ failure_count = 0
-
# If any shared attributes passed by caller, create a dictionary of
# shared memory variables for every element in the list of shared
# attributes.
if shared_attrs:
shared_vars_list_map[caller_id] = shared_attrs
for name in shared_attrs:
- shared_vars_map.put((caller_id, name), 0)
+ shared_vars_map.Put((caller_id, name), 0)
# Make all of the requested function calls.
if self.multiprocessing_is_available and thread_count * process_count > 1:
self._ParallelApply(func, args_iterator, exception_handler, caller_id,
- arg_checker, parallel_operations_override,
- process_count, thread_count, should_return_results,
- fail_on_error)
+ arg_checker, process_count, thread_count,
+ should_return_results, fail_on_error)
else:
self._SequentialApply(func, args_iterator, exception_handler, caller_id,
arg_checker, should_return_results, fail_on_error)
@@ -978,61 +1071,89 @@
for name in shared_attrs:
# This allows us to retain the original value of the shared variable,
# and simply apply the delta after what was done during the call to
- # apply.
+ # apply.
final_value = (original_shared_vars_values[name] +
- shared_vars_map.get((caller_id, name)))
+ shared_vars_map.Get((caller_id, name)))
setattr(self, name, final_value)
if should_return_results:
- return global_return_values_map.get(caller_id)
+ return global_return_values_map.Get(caller_id)
+ def _MaybeSuggestGsutilDashM(self):
+ """Outputs a sugestion to the user to use gsutil -m."""
+ if not (boto.config.getint('GSUtil', 'parallel_process_count', 0) == 1 and
+ boto.config.getint('GSUtil', 'parallel_thread_count', 0) == 1):
+ self.logger.warn('\n' + textwrap.fill(
+ '==> NOTE: You are performing a sequence of gsutil operations that '
+ 'may run significantly faster if you instead use gsutil -m %s ...\n'
+ 'Please see the -m section under "gsutil help options" for further '
+ 'information about when gsutil -m can be advantageous.'
+ % sys.argv[1]) + '\n')
+
+ # pylint: disable=g-doc-args
def _SequentialApply(self, func, args_iterator, exception_handler, caller_id,
arg_checker, should_return_results, fail_on_error):
+ """Performs all function calls sequentially in the current thread.
+
+ No other threads or processes will be spawned. This degraded functionality
+ is used when the multiprocessing module is not available or the user
+ requests only one thread and one process.
"""
- Perform all function calls sequentially in the current thread. No other
- threads or processes will be spawned. This degraded functionality is only
- for use when the multiprocessing module is not available for some reason.
- """
-
# Create a WorkerThread to handle all of the logic needed to actually call
# the function. Note that this thread will never be started, and all work
# is done in the current thread.
worker_thread = WorkerThread(None, False)
args_iterator = iter(args_iterator)
+ # Count of sequential calls that have been made. Used for producing
+ # suggestion to use gsutil -m.
+ sequential_call_count = 0
while True:
-
+
# Try to get the next argument, handling any exceptions that arise.
try:
args = args_iterator.next()
except StopIteration, e:
break
- except Exception, e:
+ except Exception, e: # pylint: disable=broad-except
+ _IncrementFailureCount()
if fail_on_error:
raise
else:
try:
exception_handler(self, e)
- except Exception, e1:
+ except Exception, _: # pylint: disable=broad-except
self.logger.debug(
'Caught exception while handling exception for %s:\n%s',
func, traceback.format_exc())
continue
+
+ sequential_call_count += 1
+ if sequential_call_count == OFFER_GSUTIL_M_SUGGESTION_THRESHOLD:
+ # Output suggestion near beginning of run, so user sees it early and can
+ # ^C and try gsutil -m.
+ self._MaybeSuggestGsutilDashM()
if arg_checker(self, args):
# Now that we actually have the next argument, perform the task.
task = Task(func, args, caller_id, exception_handler,
should_return_results, arg_checker, fail_on_error)
worker_thread.PerformTask(task, self)
+ if sequential_call_count >= gslib.util.GetTermLines():
+ # Output suggestion at end of long run, in case user missed it at the
+ # start and it scrolled off-screen.
+ self._MaybeSuggestGsutilDashM()
+ # pylint: disable=g-doc-args
def _ParallelApply(self, func, args_iterator, exception_handler, caller_id,
- arg_checker, parallel_operations_override, process_count,
- thread_count, should_return_results, fail_on_error):
- """
- Dispatch input arguments across a pool of parallel OS
- processes and/or Python threads, based on options (-m or not)
- and settings in the user's config file. If non-parallel mode
- or only one OS process requested, execute requests sequentially
- in the current OS process.
-
+ arg_checker, process_count, thread_count,
+ should_return_results, fail_on_error):
+ """Dispatches input arguments across a thread/process pool.
+
+ Pools are composed of parallel OS processes and/or Python threads,
+ based on options (-m or not) and settings in the user's config file.
+
+ If only one OS process is requested/available, dispatch requests across
+ threads in the current OS process.
+
In the multi-process case, we will create one pool of worker processes for
each level of the tree of recursive calls to Apply. E.g., if A calls
Apply(B), and B ultimately calls Apply(C) followed by Apply(D), then we
@@ -1050,7 +1171,7 @@
Apply's parallelism is generally broken up into 4 cases:
- If process_count == thread_count == 1, then all tasks will be executed
- in this thread.
+ by _SequentialApply.
- If process_count > 1 and thread_count == 1, then the main thread will
create a new pool of processes (if they don't already exist) and each of
those processes will execute the tasks in a single thread.
@@ -1064,14 +1185,14 @@
Args:
caller_id: The caller ID unique to this call to command.Apply.
See command.Apply for description of other arguments.
- """
+ """
is_main_thread = self.recursive_apply_level == 0
# Catch ^C under Linux/MacOs so we can do cleanup before exiting.
if not IS_WINDOWS and is_main_thread:
signal.signal(signal.SIGINT, self._HandleMultiProcessingControlC)
- if not len(task_queues):
+ if not task_queues:
# The process we create will need to access the next recursive level
# of task queues if it makes a call to Apply, so we always keep around
# one more queue than we know we need. OTOH, if we don't create a new
@@ -1080,18 +1201,28 @@
if process_count > 1: # Handle process pool creation.
# Check whether this call will need a new set of workers.
- with need_pool_or_done_cond:
+
+ # Each worker must acquire a shared lock before notifying the main thread
+ # that it needs a new worker pool, so that at most one worker asks for
+ # a new worker pool at once.
+ try:
+ if not is_main_thread:
+ worker_checking_level_lock.acquire()
if self.recursive_apply_level >= current_max_recursive_level.value:
- # Only the main thread is allowed to create new processes - otherwise,
- # we will run into some Python bugs.
- if is_main_thread:
- self._CreateNewConsumerPool(process_count, thread_count)
- else:
- # Notify the main thread that we need a new consumer pool.
- new_pool_needed.value = 1
- need_pool_or_done_cond.notify_all()
- # The main thread will notify us when it finishes.
- need_pool_or_done_cond.wait()
+ with need_pool_or_done_cond:
+ # Only the main thread is allowed to create new processes -
+ # otherwise, we will run into some Python bugs.
+ if is_main_thread:
+ self._CreateNewConsumerPool(process_count, thread_count)
+ else:
+ # Notify the main thread that we need a new consumer pool.
+ new_pool_needed.value = 1
+ need_pool_or_done_cond.notify_all()
+ # The main thread will notify us when it finishes.
+ need_pool_or_done_cond.wait()
+ finally:
+ if not is_main_thread:
+ worker_checking_level_lock.release()
# If we're running in this process, create a separate task queue. Otherwise,
# if Apply has already been called with process_count > 1, then there will
@@ -1132,52 +1263,63 @@
new_pool_needed.value = 0
self._CreateNewConsumerPool(process_count, thread_count)
need_pool_or_done_cond.notify_all()
-
+
# Note that we must check the above conditions before the wait() call;
# otherwise, the notification can happen before we start waiting, in
# which case we'll block forever.
need_pool_or_done_cond.wait()
else: # Using a single process.
- shard = 0
- self._ApplyThreads(thread_count, self.recursive_apply_level, shard,
+ self._ApplyThreads(thread_count, process_count,
+ self.recursive_apply_level,
is_blocking_call=True, task_queue=task_queue)
# We encountered an exception from the producer thread before any arguments
# were enqueued, but it wouldn't have been propagated, so we'll now
# explicitly raise it here.
if producer_thread.unknown_exception:
+ # pylint: disable=raising-bad-type
raise producer_thread.unknown_exception
# We encountered an exception from the producer thread while iterating over
# the arguments, so raise it here if we're meant to fail on error.
if producer_thread.iterator_exception and fail_on_error:
+ # pylint: disable=raising-bad-type
raise producer_thread.iterator_exception
- def _ApplyThreads(self, thread_count, recursive_apply_level, shard,
+ def _ApplyThreads(self, thread_count, process_count, recursive_apply_level,
is_blocking_call=False, task_queue=None):
- """
- Assigns the work from the global task queue shared among all processes
- to an individual process, for later consumption either by the WorkerThreads
- or in this thread if thread_count == 1.
-
+ """Assigns the work from the multi-process global task queue.
+
+ Work is assigned to an individual process for later consumption either by
+ the WorkerThreads or (if thread_count == 1) this thread.
+
Args:
thread_count: The number of threads used to perform the work. If 1, then
perform all work in this thread.
+ process_count: The number of processes used to perform the work.
recursive_apply_level: The depth in the tree of recursive calls to Apply
of this thread.
- shard: Assigned subset (shard number) for this function.
is_blocking_call: True iff the call to Apply is blocked on this call
(which is true iff process_count == 1), implying that
_ApplyThreads must behave as a blocking call.
"""
self._ResetConnectionPool()
-
+ self.recursive_apply_level = recursive_apply_level
+
task_queue = task_queue or task_queues[recursive_apply_level]
+ assert thread_count * process_count > 1, (
+ 'Invalid state, calling command._ApplyThreads with only one thread '
+ 'and process.')
if thread_count > 1:
- worker_pool = WorkerPool(thread_count)
- else:
- worker_pool = SameThreadWorkerPool(self)
+ worker_pool = WorkerPool(
+ thread_count, self.logger,
+ bucket_storage_uri_class=self.bucket_storage_uri_class,
+ gsutil_api_map=self.gsutil_api_map, debug=self.debug)
+ elif process_count > 1:
+ worker_pool = SameThreadWorkerPool(
+ self, bucket_storage_uri_class=self.bucket_storage_uri_class,
+ gsutil_api_map=self.gsutil_api_map, debug=self.debug)
num_enqueued = 0
while True:
@@ -1212,6 +1354,7 @@
class _ConsumerPool(object):
+
def __init__(self, processes, task_queue):
self.processes = processes
self.task_queue = task_queue
@@ -1229,7 +1372,7 @@
kernel32 = ctypes.windll.kernel32
handle = kernel32.OpenProcess(1, 0, pid)
kernel32.TerminateProcess(handle, 0)
- except:
+ except: # pylint: disable=bare-except
pass
else:
try:
@@ -1238,10 +1381,11 @@
pass
-class Task(namedtuple('Task',
- 'func args caller_id exception_handler should_return_results arg_checker ' +
- 'fail_on_error')):
- """
+class Task(namedtuple('Task', (
+ 'func args caller_id exception_handler should_return_results arg_checker '
+ 'fail_on_error'))):
+ """Task class representing work to be completed.
+
Args:
func: The function to be executed.
args: The arguments to func.
@@ -1261,10 +1405,12 @@
class ProducerThread(threading.Thread):
"""Thread used to enqueue work for other processes and threads."""
+
def __init__(self, cls, args_iterator, caller_id, func, task_queue,
should_return_results, exception_handler, arg_checker,
fail_on_error):
- """
+ """Initializes the producer thread.
+
Args:
cls: Instance of Command for which this ProducerThread was created.
args_iterator: Iterable collection of arguments to be put into the
@@ -1311,14 +1457,15 @@
args = args_iterator.next()
except StopIteration, e:
break
- except Exception, e:
+ except Exception, e: # pylint: disable=broad-except
+ _IncrementFailureCount()
if self.fail_on_error:
self.iterator_exception = e
raise
else:
try:
self.exception_handler(self.cls, e)
- except Exception, e1:
+ except Exception, _: # pylint: disable=broad-except
self.cls.logger.debug(
'Caught exception while handling exception for %s:\n%s',
self.func, traceback.format_exc())
@@ -1332,8 +1479,8 @@
self.exception_handler, self.should_return_results,
self.arg_checker, self.fail_on_error)
if last_task:
- self.task_queue.put(last_task, self.caller_id)
- except Exception, e:
+ self.task_queue.put(last_task)
+ except Exception, e: # pylint: disable=broad-except
# This will also catch any exception raised due to an error in the
# iterator when fail_on_error is set, so check that we failed for some
# other reason before claiming that we had an unknown exception.
@@ -1350,31 +1497,41 @@
# This happens if there were zero arguments to be put in the queue.
cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id,
None, None, None, None)
- self.task_queue.put(cur_task, self.caller_id)
+ self.task_queue.put(cur_task)
# It's possible that the workers finished before we updated total_tasks,
# so we need to check here as well.
_NotifyIfDone(self.caller_id,
- caller_id_finished_count.get(self.caller_id))
-
+ caller_id_finished_count.Get(self.caller_id))
+
class SameThreadWorkerPool(object):
"""Behaves like a WorkerPool, but used for the single-threaded case."""
- def __init__(self, cls):
+
+ def __init__(self, cls, bucket_storage_uri_class=None,
+ gsutil_api_map=None, debug=0):
self.cls = cls
- self.worker_thread = WorkerThread(None)
-
+ self.worker_thread = WorkerThread(
+ None, cls.logger,
+ bucket_storage_uri_class=bucket_storage_uri_class,
+ gsutil_api_map=gsutil_api_map, debug=debug)
+
def AddTask(self, task):
self.worker_thread.PerformTask(task, self.cls)
class WorkerPool(object):
"""Pool of worker threads to which tasks can be added."""
- def __init__(self, thread_count):
+
+ def __init__(self, thread_count, logger, bucket_storage_uri_class=None,
+ gsutil_api_map=None, debug=0):
self.task_queue = _NewThreadsafeQueue()
self.threads = []
for _ in range(thread_count):
- worker_thread = WorkerThread(self.task_queue)
+ worker_thread = WorkerThread(
+ self.task_queue, logger,
+ bucket_storage_uri_class=bucket_storage_uri_class,
+ gsutil_api_map=gsutil_api_map, debug=debug)
self.threads.append(worker_thread)
worker_thread.start()
@@ -1383,52 +1540,63 @@
class WorkerThread(threading.Thread):
- """
- This thread is where all of the work will be performed in actually making the
- function calls for Apply. It takes care of all error handling, return value
- propagation, and shared_vars.
+ """Thread where all the work will be performed.
+ This makes the function calls for Apply and takes care of all error handling,
+ return value propagation, and shared_vars.
+
Note that this thread is NOT started upon instantiation because the function-
calling logic is also used in the single-threaded case.
"""
- def __init__(self, task_queue, multiprocessing_is_available=True):
- """
+
+ def __init__(self, task_queue, logger, bucket_storage_uri_class=None,
+ gsutil_api_map=None, debug=0):
+ """Initializes the worker thread.
+
Args:
task_queue: The thread-safe queue from which this thread should obtain
its work.
- multiprocessing_is_available: False iff the multiprocessing module is not
- available, in which case we're using
- _SequentialApply.
+ logger: Logger to use for this thread.
+ bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
+ Settable for testing/mocking.
+ gsutil_api_map: Map of providers and API selector tuples to api classes
+ which can be used to communicate with those providers.
+ Used for the instantiating CloudApiDelegator class.
+ debug: debug level for the CloudApiDelegator class.
"""
super(WorkerThread, self).__init__()
self.task_queue = task_queue
- self.multiprocessing_is_available = multiprocessing_is_available
self.daemon = True
self.cached_classes = {}
self.shared_vars_updater = _SharedVariablesUpdater()
+ self.thread_gsutil_api = None
+ if bucket_storage_uri_class and gsutil_api_map:
+ self.thread_gsutil_api = CloudApiDelegator(
+ bucket_storage_uri_class, gsutil_api_map, logger, debug=debug)
+
def PerformTask(self, task, cls):
- """
- Makes the function call for a task.
+ """Makes the function call for a task.
Args:
task: The Task to perform.
cls: The instance of a class which gives context to the functions called
- by the Task's function. E.g., see _SetAclFuncWrapper.
+ by the Task's function. E.g., see SetAclFuncWrapper.
"""
caller_id = task.caller_id
try:
- results = task.func(cls, task.args)
+ results = task.func(cls, task.args, thread_state=self.thread_gsutil_api)
if task.should_return_results:
- global_return_values_map.update(caller_id, [results], default_value=[])
- except Exception, e:
+ global_return_values_map.Update(caller_id, [results], default_value=[])
+ except Exception, e: # pylint: disable=broad-except
+ _IncrementFailureCount()
if task.fail_on_error:
raise # Only happens for single thread and process case.
else:
try:
task.exception_handler(cls, e)
- except Exception, e1:
- # Don't allow callers to throw exceptions here and kill the worker
+ except Exception, _: # pylint: disable=broad-except
+ # Don't allow callers to raise exceptions here and kill the worker
# threads.
cls.logger.debug(
'Caught exception while handling exception for %s:\n%s',
@@ -1436,13 +1604,14 @@
finally:
self.shared_vars_updater.Update(caller_id, cls)
- # Even if we encounter an exception, we still need to claim that that
- # the function finished executing. Otherwise, we won't know when to
- # stop waiting and return results.
- num_done = caller_id_finished_count.update(caller_id, 1)
- if self.multiprocessing_is_available:
- _NotifyIfDone(caller_id, num_done)
+ # Even if we encounter an exception, we still need to claim that that
+ # the function finished executing. Otherwise, we won't know when to
+ # stop waiting and return results.
+ num_done = caller_id_finished_count.Update(caller_id, 1)
+ if cls.multiprocessing_is_available:
+ _NotifyIfDone(caller_id, num_done)
+
def run(self):
while True:
task = self.task_queue.get()
@@ -1459,11 +1628,12 @@
class _SharedVariablesUpdater(object):
- """Used to update shared variable for a class in the global map. Note that
- each thread will have its own instance of the calling class for context,
- and it will also have its own instance of a _SharedVariablesUpdater.
- This is used in the following way:
+ """Used to update shared variable for a class in the global map.
+ Note that each thread will have its own instance of the calling class for
+ context, and it will also have its own instance of a
+ _SharedVariablesUpdater. This is used in the following way:
+
1. Before any tasks are performed, each thread will get a copy of the
calling class, and the globally-consistent value of this shared variable
will be initialized to whatever it was before the call to Apply began.
@@ -1480,7 +1650,8 @@
as the globally-consistent value shared across all classes (the
globally consistent value is simply increased by the computed
delta).
- """
+ """
+
def __init__(self):
self.last_shared_var_values = {}
@@ -1499,19 +1670,19 @@
# Update the globally-consistent value by simply increasing it by the
# computed delta.
- shared_vars_map.update(key, delta)
+ shared_vars_map.Update(key, delta)
def _NotifyIfDone(caller_id, num_done):
- """
- Notify any threads that are waiting for results that something has finished.
+ """Notify any threads waiting for results that something has finished.
+
Each waiting thread will then need to check the call_completed_map to see if
its work is done.
-
+
Note that num_done could be calculated here, but it is passed in as an
optimization so that we have one less call to a globally-locked data
structure.
-
+
Args:
caller_id: The caller_id of the function whose progress we're checking.
num_done: The number of tasks currently completed for that caller_id.
@@ -1523,13 +1694,46 @@
call_completed_map[caller_id] = True
need_pool_or_done_cond.notify_all()
+
def ShutDownGsutil():
"""Shut down all processes in consumer pools in preparation for exiting."""
for q in queues:
try:
q.cancel_join_thread()
- except:
+ except: # pylint: disable=bare-except
pass
for consumer_pool in consumer_pools:
consumer_pool.ShutDown()
+
+# pylint: disable=global-variable-undefined
+def _IncrementFailureCount():
+ global failure_count
+ if isinstance(failure_count, int):
+ failure_count += 1
+ else: # Otherwise it's a multiprocessing.Value() of type 'i'.
+ failure_count.value += 1
+
+
+# pylint: disable=global-variable-undefined
+def GetFailureCount():
+ """Returns the number of failures processed during calls to Apply()."""
+ try:
+ if isinstance(failure_count, int):
+ return failure_count
+ else: # It's a multiprocessing.Value() of type 'i'.
+ return failure_count.value
+ except NameError: # If it wasn't initialized, Apply() wasn't called.
+ return 0
+
+
+def ResetFailureCount():
+ """Resets the failure_count variable to 0 - useful if error is expected."""
+ try:
+ global failure_count
+ if isinstance(failure_count, int):
+ failure_count = 0
+ else: # It's a multiprocessing.Value() of type 'i'.
+ failure_count = multiprocessing.Value('i', 0)
+ except NameError: # If it wasn't initialized, Apply() wasn't called.
+ pass
« no previous file with comments | « gslib/cloud_api_helper.py ('k') | gslib/command_runner.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698