| OLD | NEW |
| 1 # -*- coding: utf-8 -*- |
| 1 # Copyright 2010 Google Inc. All Rights Reserved. | 2 # Copyright 2010 Google Inc. All Rights Reserved. |
| 2 # | 3 # |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); | 4 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 # you may not use this file except in compliance with the License. | 5 # you may not use this file except in compliance with the License. |
| 5 # You may obtain a copy of the License at | 6 # You may obtain a copy of the License at |
| 6 # | 7 # |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 | 8 # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 # | 9 # |
| 9 # Unless required by applicable law or agreed to in writing, software | 10 # Unless required by applicable law or agreed to in writing, software |
| 10 # distributed under the License is distributed on an "AS IS" BASIS, | 11 # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 # See the License for the specific language governing permissions and | 13 # See the License for the specific language governing permissions and |
| 13 # limitations under the License. | 14 # limitations under the License. |
| 14 | |
| 15 """Base class for gsutil commands. | 15 """Base class for gsutil commands. |
| 16 | 16 |
| 17 In addition to base class code, this file contains helpers that depend on base | 17 In addition to base class code, this file contains helpers that depend on base |
| 18 class state (such as GetAclCommandHelper) In general, functions that depend on | 18 class state (such as GetAndPrintAcl) In general, functions that depend on |
| 19 class state and that are used by multiple commands belong in this file. | 19 class state and that are used by multiple commands belong in this file. |
| 20 Functions that don't depend on class state belong in util.py, and non-shared | 20 Functions that don't depend on class state belong in util.py, and non-shared |
| 21 helpers belong in individual subclasses. | 21 helpers belong in individual subclasses. |
| 22 """ | 22 """ |
| 23 | 23 |
| 24 import boto | 24 from __future__ import absolute_import |
| 25 |
| 25 import codecs | 26 import codecs |
| 27 from collections import namedtuple |
| 26 import copy | 28 import copy |
| 27 import getopt | 29 import getopt |
| 28 import gslib | 30 from getopt import GetoptError |
| 29 import logging | 31 import logging |
| 30 import multiprocessing | 32 import multiprocessing |
| 31 import os | 33 import os |
| 32 import Queue | 34 import Queue |
| 33 import re | |
| 34 import signal | 35 import signal |
| 35 import sys | 36 import sys |
| 36 import textwrap | 37 import textwrap |
| 37 import threading | 38 import threading |
| 38 import traceback | 39 import traceback |
| 39 import wildcard_iterator | |
| 40 import xml.dom.minidom | |
| 41 | 40 |
| 42 from boto import handler | 41 import boto |
| 43 from boto.exception import GSResponseError | |
| 44 from boto.storage_uri import StorageUri | 42 from boto.storage_uri import StorageUri |
| 45 from collections import namedtuple | 43 import gslib |
| 46 from getopt import GetoptError | 44 from gslib.cloud_api import AccessDeniedException |
| 47 from gslib import util | 45 from gslib.cloud_api import ArgumentException |
| 46 from gslib.cloud_api import ServiceException |
| 47 from gslib.cloud_api_delegator import CloudApiDelegator |
| 48 from gslib.cs_api_map import ApiSelector |
| 49 from gslib.cs_api_map import GsutilApiMapFactory |
| 48 from gslib.exception import CommandException | 50 from gslib.exception import CommandException |
| 49 from gslib.help_provider import HelpProvider | 51 from gslib.help_provider import HelpProvider |
| 50 from gslib.name_expansion import NameExpansionIterator | 52 from gslib.name_expansion import NameExpansionIterator |
| 51 from gslib.name_expansion import NameExpansionIteratorQueue | 53 from gslib.name_expansion import NameExpansionResult |
| 52 from gslib.parallelism_framework_util import AtomicIncrementDict | 54 from gslib.parallelism_framework_util import AtomicIncrementDict |
| 53 from gslib.parallelism_framework_util import BasicIncrementDict | 55 from gslib.parallelism_framework_util import BasicIncrementDict |
| 54 from gslib.parallelism_framework_util import ThreadAndProcessSafeDict | 56 from gslib.parallelism_framework_util import ThreadAndProcessSafeDict |
| 55 from gslib.project_id import ProjectIdHandler | 57 from gslib.plurality_checkable_iterator import PluralityCheckableIterator |
| 56 from gslib.storage_uri_builder import StorageUriBuilder | 58 from gslib.storage_url import StorageUrlFromString |
| 59 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages |
| 60 from gslib.translation_helper import AclTranslation |
| 57 from gslib.util import GetConfigFilePath | 61 from gslib.util import GetConfigFilePath |
| 62 from gslib.util import HaveFileUrls |
| 63 from gslib.util import HaveProviderUrls |
| 58 from gslib.util import IS_WINDOWS | 64 from gslib.util import IS_WINDOWS |
| 59 from gslib.util import MultiprocessingIsAvailable | 65 from gslib.util import MultiprocessingIsAvailable |
| 60 from gslib.util import NO_MAX | 66 from gslib.util import NO_MAX |
| 61 from gslib.wildcard_iterator import ContainsWildcard | 67 from gslib.util import UrlsAreForSingleProvider |
| 62 from oauth2client.client import HAS_CRYPTO | 68 from gslib.util import UTF8 |
| 69 from gslib.wildcard_iterator import CreateWildcardIterator |
| 70 |
| 71 OFFER_GSUTIL_M_SUGGESTION_THRESHOLD = 5 |
| 63 | 72 |
| 64 if IS_WINDOWS: | 73 if IS_WINDOWS: |
| 65 import ctypes | 74 import ctypes # pylint: disable=g-import-not-at-top |
| 66 | 75 |
| 67 | 76 |
| 68 def _DefaultExceptionHandler(cls, e): | 77 def _DefaultExceptionHandler(cls, e): |
| 69 cls.logger.exception(e) | 78 cls.logger.exception(e) |
| 70 | 79 |
| 80 |
| 71 def CreateGsutilLogger(command_name): | 81 def CreateGsutilLogger(command_name): |
| 72 """Creates a logger that resembles 'print' output, but is thread safe and | 82 """Creates a logger that resembles 'print' output. |
| 73 abides by gsutil -d/-D/-DD/-q options. | 83 |
| 84 This logger abides by gsutil -d/-D/-DD/-q options. |
| 74 | 85 |
| 75 By default (if none of the above options is specified) the logger will display | 86 By default (if none of the above options is specified) the logger will display |
| 76 all messages logged with level INFO or above. Log propagation is disabled. | 87 all messages logged with level INFO or above. Log propagation is disabled. |
| 77 | 88 |
| 89 Args: |
| 90 command_name: Command name to create logger for. |
| 91 |
| 78 Returns: | 92 Returns: |
| 79 A logger object. | 93 A logger object. |
| 80 """ | 94 """ |
| 81 log = logging.getLogger(command_name) | 95 log = logging.getLogger(command_name) |
| 82 log.propagate = False | 96 log.propagate = False |
| 83 log.setLevel(logging.root.level) | 97 log.setLevel(logging.root.level) |
| 84 log_handler = logging.StreamHandler() | 98 log_handler = logging.StreamHandler() |
| 85 log_handler.setFormatter(logging.Formatter('%(message)s')) | 99 log_handler.setFormatter(logging.Formatter('%(message)s')) |
| 86 # Commands that call other commands (like mv) would cause log handlers to be | 100 # Commands that call other commands (like mv) would cause log handlers to be |
| 87 # added more than once, so avoid adding if one is already present. | 101 # added more than once, so avoid adding if one is already present. |
| 88 if not log.handlers: | 102 if not log.handlers: |
| 89 log.addHandler(log_handler) | 103 log.addHandler(log_handler) |
| 90 return log | 104 return log |
| 91 | 105 |
| 92 def _UriArgChecker(command_instance, uri): | 106 |
| 93 exp_src_uri = command_instance.suri_builder.StorageUri( | 107 def _UrlArgChecker(command_instance, url): |
| 94 uri.GetExpandedUriStr()) | 108 if not command_instance.exclude_symlinks: |
| 95 command_instance.logger.debug('process %d is handling uri %s', | 109 return True |
| 96 os.getpid(), exp_src_uri) | 110 exp_src_url = url.expanded_storage_url |
| 97 if (command_instance.exclude_symlinks and exp_src_uri.is_file_uri() | 111 if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name): |
| 98 and os.path.islink(exp_src_uri.object_name)): | 112 command_instance.logger.info('Skipping symbolic link %s...', exp_src_url) |
| 99 command_instance.logger.info('Skipping symbolic link %s...', exp_src_uri) | |
| 100 return False | 113 return False |
| 101 return True | 114 return True |
| 102 | 115 |
| 103 | 116 |
| 104 def DummyArgChecker(command_instance, arg): | 117 def DummyArgChecker(*unused_args): |
| 105 return True | 118 return True |
| 106 | 119 |
| 107 def _SetAclFuncWrapper(cls, name_expansion_result): | |
| 108 return cls._SetAclFunc(name_expansion_result) | |
| 109 | 120 |
| 110 def _SetAclExceptionHandler(cls, e): | 121 def SetAclFuncWrapper(cls, name_expansion_result, thread_state=None): |
| 122 return cls.SetAclFunc(name_expansion_result, thread_state=thread_state) |
| 123 |
| 124 |
| 125 def SetAclExceptionHandler(cls, e): |
| 111 """Exception handler that maintains state about post-completion status.""" | 126 """Exception handler that maintains state about post-completion status.""" |
| 112 cls.logger.error(str(e)) | 127 cls.logger.error(str(e)) |
| 113 cls.everything_set_okay = False | 128 cls.everything_set_okay = False |
| 114 | 129 |
| 115 # We will keep this list of all thread- or process-safe queues ever created by | 130 # We will keep this list of all thread- or process-safe queues ever created by |
| 116 # the main thread so that we can forcefully kill them upon shutdown. Otherwise, | 131 # the main thread so that we can forcefully kill them upon shutdown. Otherwise, |
| 117 # we encounter a Python bug in which empty queues block forever on join (which | 132 # we encounter a Python bug in which empty queues block forever on join (which |
| 118 # is called as part of the Python exit function cleanup) under the impression | 133 # is called as part of the Python exit function cleanup) under the impression |
| 119 # that they are non-empty. | 134 # that they are non-empty. |
| 120 # However, this also lets us shut down somewhat more cleanly when interrupted. | 135 # However, this also lets us shut down somewhat more cleanly when interrupted. |
| 121 queues = [] | 136 queues = [] |
| 122 | 137 |
| 138 |
| 123 def _NewMultiprocessingQueue(): | 139 def _NewMultiprocessingQueue(): |
| 124 queue = multiprocessing.Queue(MAX_QUEUE_SIZE) | 140 queue = multiprocessing.Queue(MAX_QUEUE_SIZE) |
| 125 queues.append(queue) | 141 queues.append(queue) |
| 126 return queue | 142 return queue |
| 127 | 143 |
| 144 |
| 128 def _NewThreadsafeQueue(): | 145 def _NewThreadsafeQueue(): |
| 129 queue = Queue.Queue(MAX_QUEUE_SIZE) | 146 queue = Queue.Queue(MAX_QUEUE_SIZE) |
| 130 queues.append(queue) | 147 queues.append(queue) |
| 131 return queue | 148 return queue |
| 132 | 149 |
| 133 | |
| 134 # command_spec key constants. | |
| 135 COMMAND_NAME = 'command_name' | |
| 136 COMMAND_NAME_ALIASES = 'command_name_aliases' | |
| 137 MIN_ARGS = 'min_args' | |
| 138 MAX_ARGS = 'max_args' | |
| 139 SUPPORTED_SUB_ARGS = 'supported_sub_args' | |
| 140 FILE_URIS_OK = 'file_uri_ok' | |
| 141 PROVIDER_URIS_OK = 'provider_uri_ok' | |
| 142 URIS_START_ARG = 'uris_start_arg' | |
| 143 | |
| 144 # The maximum size of a process- or thread-safe queue. Imposing this limit | 150 # The maximum size of a process- or thread-safe queue. Imposing this limit |
| 145 # prevents us from needing to hold an arbitrary amount of data in memory. | 151 # prevents us from needing to hold an arbitrary amount of data in memory. |
| 146 # However, setting this number too high (e.g., >= 32768 on OS X) can cause | 152 # However, setting this number too high (e.g., >= 32768 on OS X) can cause |
| 147 # problems on some operating systems. | 153 # problems on some operating systems. |
| 148 MAX_QUEUE_SIZE = 32500 | 154 MAX_QUEUE_SIZE = 32500 |
| 149 | 155 |
| 150 # That maximum depth of the tree of recursive calls to command.Apply. This is | 156 # That maximum depth of the tree of recursive calls to command.Apply. This is |
| 151 # an arbitrary limit put in place to prevent developers from accidentally | 157 # an arbitrary limit put in place to prevent developers from accidentally |
| 152 # causing problems with infinite recursion, and it can be increased if needed. | 158 # causing problems with infinite recursion, and it can be increased if needed. |
| 153 MAX_RECURSIVE_DEPTH = 5 | 159 MAX_RECURSIVE_DEPTH = 5 |
| 154 | 160 |
| 155 ZERO_TASKS_TO_DO_ARGUMENT = ("There were no", "tasks to do") | 161 ZERO_TASKS_TO_DO_ARGUMENT = ('There were no', 'tasks to do') |
| 156 | 162 |
| 157 # Map from deprecated aliases to the current command and subcommands that | 163 # Map from deprecated aliases to the current command and subcommands that |
| 158 # provide the same behavior. | 164 # provide the same behavior. |
| 159 # TODO: Remove this map and deprecate old commands on 9/9/14. | 165 # TODO: Remove this map and deprecate old commands on 9/9/14. |
| 160 OLD_ALIAS_MAP = {'chacl': ['acl', 'ch'], | 166 OLD_ALIAS_MAP = {'chacl': ['acl', 'ch'], |
| 161 'getacl': ['acl', 'get'], | 167 'getacl': ['acl', 'get'], |
| 162 'setacl': ['acl', 'set'], | 168 'setacl': ['acl', 'set'], |
| 163 'getcors': ['cors', 'get'], | 169 'getcors': ['cors', 'get'], |
| 164 'setcors': ['cors', 'set'], | 170 'setcors': ['cors', 'set'], |
| 165 'chdefacl': ['defacl', 'ch'], | 171 'chdefacl': ['defacl', 'ch'], |
| 166 'getdefacl': ['defacl', 'get'], | 172 'getdefacl': ['defacl', 'get'], |
| 167 'setdefacl': ['defacl', 'set'], | 173 'setdefacl': ['defacl', 'set'], |
| 168 'disablelogging': ['logging', 'set', 'off'], | 174 'disablelogging': ['logging', 'set', 'off'], |
| 169 'enablelogging': ['logging', 'set', 'on'], | 175 'enablelogging': ['logging', 'set', 'on'], |
| 170 'getlogging': ['logging', 'get'], | 176 'getlogging': ['logging', 'get'], |
| 171 'getversioning': ['versioning', 'get'], | 177 'getversioning': ['versioning', 'get'], |
| 172 'setversioning': ['versioning', 'set'], | 178 'setversioning': ['versioning', 'set'], |
| 173 'getwebcfg': ['web', 'get'], | 179 'getwebcfg': ['web', 'get'], |
| 174 'setwebcfg': ['web', 'set']} | 180 'setwebcfg': ['web', 'set']} |
| 175 | 181 |
| 176 | 182 |
| 177 # Declare all of the module level variables - see | 183 # Declare all of the module level variables - see |
| 178 # InitializeMultiprocessingVariables for an explanation of why this is | 184 # InitializeMultiprocessingVariables for an explanation of why this is |
| 179 # necessary. | 185 # necessary. |
| 186 # pylint: disable=global-at-module-level |
| 180 global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter | 187 global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter |
| 181 global total_tasks, call_completed_map, global_return_values_map | 188 global total_tasks, call_completed_map, global_return_values_map |
| 182 global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed | 189 global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed |
| 183 global current_max_recursive_level, shared_vars_map, shared_vars_list_map | 190 global current_max_recursive_level, shared_vars_map, shared_vars_list_map |
| 184 global class_map | 191 global class_map, worker_checking_level_lock, failure_count |
| 185 | 192 |
| 186 | 193 |
| 187 def InitializeMultiprocessingVariables(): | 194 def InitializeMultiprocessingVariables(): |
| 188 """ | 195 """Initializes module-level variables that will be inherited by subprocesses. |
| 189 Used to initialize module-level variables that will be inherited by | 196 |
| 190 subprocesses. On Windows, a multiprocessing.Manager object should only | 197 On Windows, a multiprocessing.Manager object should only |
| 191 be created within an "if __name__ == '__main__':" block. This function | 198 be created within an "if __name__ == '__main__':" block. This function |
| 192 must be called, otherwise every command that calls Command.Apply will fail. | 199 must be called, otherwise every command that calls Command.Apply will fail. |
| 193 """ | 200 """ |
| 194 # This list of global variables must exactly match the above list of | 201 # This list of global variables must exactly match the above list of |
| 195 # declarations. | 202 # declarations. |
| 203 # pylint: disable=global-variable-undefined |
| 196 global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter | 204 global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter |
| 197 global total_tasks, call_completed_map, global_return_values_map | 205 global total_tasks, call_completed_map, global_return_values_map |
| 198 global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed | 206 global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed |
| 199 global current_max_recursive_level, shared_vars_map, shared_vars_list_map | 207 global current_max_recursive_level, shared_vars_map, shared_vars_list_map |
| 200 global class_map | 208 global class_map, worker_checking_level_lock, failure_count |
| 201 | 209 |
| 202 manager = multiprocessing.Manager() | 210 manager = multiprocessing.Manager() |
| 203 | 211 |
| 204 consumer_pools = [] | 212 consumer_pools = [] |
| 205 | 213 |
| 206 # List of all existing task queues - used by all pools to find the queue | 214 # List of all existing task queues - used by all pools to find the queue |
| 207 # that's appropriate for the given recursive_apply_level. | 215 # that's appropriate for the given recursive_apply_level. |
| 208 task_queues = [] | 216 task_queues = [] |
| 209 | 217 |
| 210 # Used to assign a globally unique caller ID to each Apply call. | 218 # Used to assign a globally unique caller ID to each Apply call. |
| 211 caller_id_lock = manager.Lock() | 219 caller_id_lock = manager.Lock() |
| 212 caller_id_counter = multiprocessing.Value('i', 0) | 220 caller_id_counter = multiprocessing.Value('i', 0) |
| 213 | 221 |
| 214 # Map from caller_id to total number of tasks to be completed for that ID. | 222 # Map from caller_id to total number of tasks to be completed for that ID. |
| 215 total_tasks = ThreadAndProcessSafeDict(manager) | 223 total_tasks = ThreadAndProcessSafeDict(manager) |
| 216 | 224 |
| 217 # Map from caller_id to a boolean which is True iff all its tasks are | 225 # Map from caller_id to a boolean which is True iff all its tasks are |
| 218 # finished. | 226 # finished. |
| 219 call_completed_map = ThreadAndProcessSafeDict(manager) | 227 call_completed_map = ThreadAndProcessSafeDict(manager) |
| 220 | 228 |
| 221 # Used to keep track of the set of return values for each caller ID. | 229 # Used to keep track of the set of return values for each caller ID. |
| 222 global_return_values_map = AtomicIncrementDict(manager) | 230 global_return_values_map = AtomicIncrementDict(manager) |
| 223 | 231 |
| 224 # Condition used to notify any waiting threads that a task has finished or | 232 # Condition used to notify any waiting threads that a task has finished or |
| 225 # that a call to Apply needs a new set of consumer processes. | 233 # that a call to Apply needs a new set of consumer processes. |
| 226 need_pool_or_done_cond = manager.Condition() | 234 need_pool_or_done_cond = manager.Condition() |
| 227 | 235 |
| 236 # Lock used to prevent multiple worker processes from asking the main thread |
| 237 # to create a new consumer pool for the same level. |
| 238 worker_checking_level_lock = manager.Lock() |
| 239 |
| 228 # Map from caller_id to the current number of completed tasks for that ID. | 240 # Map from caller_id to the current number of completed tasks for that ID. |
| 229 caller_id_finished_count = AtomicIncrementDict(manager) | 241 caller_id_finished_count = AtomicIncrementDict(manager) |
| 230 | 242 |
| 231 # Used as a way for the main thread to distinguish between being woken up | 243 # Used as a way for the main thread to distinguish between being woken up |
| 232 # by another call finishing and being woken up by a call that needs a new set | 244 # by another call finishing and being woken up by a call that needs a new set |
| 233 # of consumer processes. | 245 # of consumer processes. |
| 234 new_pool_needed = multiprocessing.Value('i', 0) | 246 new_pool_needed = multiprocessing.Value('i', 0) |
| 235 | 247 |
| 236 current_max_recursive_level = multiprocessing.Value('i', 0) | 248 current_max_recursive_level = multiprocessing.Value('i', 0) |
| 237 | 249 |
| 238 # Map from (caller_id, name) to the value of that shared variable. | 250 # Map from (caller_id, name) to the value of that shared variable. |
| 239 shared_vars_map = AtomicIncrementDict(manager) | 251 shared_vars_map = AtomicIncrementDict(manager) |
| 240 shared_vars_list_map = ThreadAndProcessSafeDict(manager) | 252 shared_vars_list_map = ThreadAndProcessSafeDict(manager) |
| 241 | 253 |
| 242 # Map from caller_id to calling class. | 254 # Map from caller_id to calling class. |
| 243 class_map = manager.dict() | 255 class_map = manager.dict() |
| 244 | 256 |
| 257 # Number of tasks that resulted in an exception in calls to Apply(). |
| 258 failure_count = multiprocessing.Value('i', 0) |
| 245 | 259 |
| 246 class Command(object): | |
| 247 REQUIRED_SPEC_KEYS = [COMMAND_NAME] | |
| 248 | 260 |
| 249 # Each subclass must define the following map, minimally including the | 261 # Each subclass of Command must define a property named 'command_spec' that is |
| 250 # keys in REQUIRED_SPEC_KEYS; other values below will be used as defaults, | 262 # an instance of the following class. |
| 251 # although for readbility subclasses should specify the complete map. | 263 CommandSpec = namedtuple('CommandSpec', [ |
| 252 command_spec = { | |
| 253 # Name of command. | 264 # Name of command. |
| 254 COMMAND_NAME : None, | 265 'command_name', |
| 255 # List of command name aliases. | 266 # List of command name aliases. |
| 256 COMMAND_NAME_ALIASES : [], | 267 'command_name_aliases', |
| 257 # Min number of args required by this command. | 268 # Min number of args required by this command. |
| 258 MIN_ARGS : 0, | 269 'min_args', |
| 259 # Max number of args required by this command, or NO_MAX. | 270 # Max number of args required by this command, or NO_MAX. |
| 260 MAX_ARGS : NO_MAX, | 271 'max_args', |
| 261 # Getopt-style string specifying acceptable sub args. | 272 # Getopt-style string specifying acceptable sub args. |
| 262 SUPPORTED_SUB_ARGS : '', | 273 'supported_sub_args', |
| 263 # True if file URIs are acceptable for this command. | 274 # True if file URLs are acceptable for this command. |
| 264 FILE_URIS_OK : False, | 275 'file_url_ok', |
| 265 # True if provider-only URIs are acceptable for this command. | 276 # True if provider-only URLs are acceptable for this command. |
| 266 PROVIDER_URIS_OK : False, | 277 'provider_url_ok', |
| 267 # Index in args of first URI arg. | 278 # Index in args of first URL arg. |
| 268 URIS_START_ARG : 0, | 279 'urls_start_arg', |
| 269 } | 280 # List of supported APIs |
| 270 _default_command_spec = command_spec | 281 'gs_api_support', |
| 271 help_spec = HelpProvider.help_spec | 282 # Default API to use for this command |
| 272 _commands_with_subcommands_and_subopts = ['acl', 'defacl', 'logging', 'web'] | 283 'gs_default_api', |
| 284 # Private arguments (for internal testing) |
| 285 'supported_private_args', |
| 286 ]) |
| 273 | 287 |
| 274 """Define an empty test specification, which derived classes must populate. | |
| 275 | 288 |
| 276 This is a list of tuples containing the following values: | 289 class Command(HelpProvider): |
| 290 """Base class for all gsutil commands.""" |
| 277 | 291 |
| 278 step_name - mnemonic name for test, displayed when test is run | 292 # Each subclass must override this with an instance of CommandSpec. |
| 279 cmd_line - shell command line to run test | 293 command_spec = None |
| 280 expect_ret or None - expected return code from test (None means ignore) | |
| 281 (result_file, expect_file) or None - tuple of result file and expected | |
| 282 file to diff for additional test | |
| 283 verification beyond the return code | |
| 284 (None means no diff requested) | |
| 285 Notes: | |
| 286 | 294 |
| 287 - Setting expected_ret to None means there is no expectation and, | 295 _commands_with_subcommands_and_subopts = ['acl', 'defacl', 'logging', 'web', |
| 288 hence, any returned value will pass. | 296 'notification'] |
| 289 | 297 |
| 290 - Any occurrences of the string 'gsutil' in the cmd_line parameter | |
| 291 are expanded to the full path to the gsutil command under test. | |
| 292 | |
| 293 - The cmd_line, result_file and expect_file parameters may | |
| 294 contain the following special substrings: | |
| 295 | |
| 296 $Bn - converted to one of 10 unique-for-testing bucket names (n=0..9) | |
| 297 $On - converted to one of 10 unique-for-testing object names (n=0..9) | |
| 298 $Fn - converted to one of 10 unique-for-testing file names (n=0..9) | |
| 299 $G - converted to the directory where gsutil is installed. Useful for | |
| 300 referencing test data. | |
| 301 | |
| 302 - The generated file names are full pathnames, whereas the generated | |
| 303 bucket and object names are simple relative names. | |
| 304 | |
| 305 - Tests with a non-None result_file and expect_file automatically | |
| 306 trigger an implicit diff of the two files. | |
| 307 | |
| 308 - These test specifications, in combination with the conversion strings | |
| 309 allow tests to be constructed parametrically. For example, here's an | |
| 310 annotated subset of a test_steps for the cp command: | |
| 311 | |
| 312 # Copy local file to object, verify 0 return code. | |
| 313 ('simple cp', 'gsutil cp $F1 gs://$B1/$O1', 0, None, None), | |
| 314 # Copy uploaded object back to local file and diff vs. orig file. | |
| 315 ('verify cp', 'gsutil cp gs://$B1/$O1 $F2', 0, '$F2', '$F1'), | |
| 316 | |
| 317 - After pattern substitution, the specs are run sequentially, in the | |
| 318 order in which they appear in the test_steps list. | |
| 319 """ | |
| 320 test_steps = [] | |
| 321 | |
| 322 # This keeps track of the recursive depth of the current call to Apply. | 298 # This keeps track of the recursive depth of the current call to Apply. |
| 323 recursive_apply_level = 0 | 299 recursive_apply_level = 0 |
| 324 | 300 |
| 325 # If the multiprocessing module isn't available, we'll use this to keep track | 301 # If the multiprocessing module isn't available, we'll use this to keep track |
| 326 # of the caller_id. | 302 # of the caller_id. |
| 327 sequential_caller_id = -1 | 303 sequential_caller_id = -1 |
| 328 | 304 |
| 305 @staticmethod |
| 306 def CreateCommandSpec(command_name, command_name_aliases=None, min_args=0, |
| 307 max_args=NO_MAX, supported_sub_args='', |
| 308 file_url_ok=False, provider_url_ok=False, |
| 309 urls_start_arg=0, gs_api_support=None, |
| 310 gs_default_api=None, supported_private_args=None): |
| 311 """Creates an instance of CommandSpec, with defaults.""" |
| 312 return CommandSpec( |
| 313 command_name=command_name, |
| 314 command_name_aliases=command_name_aliases or [], |
| 315 min_args=min_args, |
| 316 max_args=max_args, |
| 317 supported_sub_args=supported_sub_args, |
| 318 file_url_ok=file_url_ok, |
| 319 provider_url_ok=provider_url_ok, |
| 320 urls_start_arg=urls_start_arg, |
| 321 gs_api_support=gs_api_support or [ApiSelector.XML], |
| 322 gs_default_api=gs_default_api or ApiSelector.XML, |
| 323 supported_private_args=supported_private_args) |
| 324 |
| 329 # Define a convenience property for command name, since it's used many places. | 325 # Define a convenience property for command name, since it's used many places. |
| 330 def _GetDefaultCommandName(self): | 326 def _GetDefaultCommandName(self): |
| 331 return self.command_spec[COMMAND_NAME] | 327 return self.command_spec.command_name |
| 332 command_name = property(_GetDefaultCommandName) | 328 command_name = property(_GetDefaultCommandName) |
| 333 | 329 |
| 334 def _CalculateUrisStartArg(self): | 330 def _CalculateUrlsStartArg(self): |
| 335 """Calculate the index in args of the first URI arg. By default, just use | 331 """Calculate the index in args of the first URL arg. |
| 336 the value from command_spec. | 332 |
| 333 Returns: |
| 334 Index of the first URL arg (according to the command spec). |
| 337 """ | 335 """ |
| 338 return self.command_spec[URIS_START_ARG] | 336 return self.command_spec.urls_start_arg |
| 339 | 337 |
| 340 def _TranslateDeprecatedAliases(self, args): | 338 def _TranslateDeprecatedAliases(self, args): |
| 341 """For commands that have deprecated aliases, this will map the aliases to | 339 """Map deprecated aliases to the corresponding new command, and warn.""" |
| 342 the corresponding new command and also warn the user about deprecation. | |
| 343 """ | |
| 344 new_command_args = OLD_ALIAS_MAP.get(self.command_alias_used, None) | 340 new_command_args = OLD_ALIAS_MAP.get(self.command_alias_used, None) |
| 345 if new_command_args: | 341 if new_command_args: |
| 346 # Prepend any subcommands for the new command. The command name itself | 342 # Prepend any subcommands for the new command. The command name itself |
| 347 # is not part of the args, so leave it out. | 343 # is not part of the args, so leave it out. |
| 348 args = new_command_args[1:] + args | 344 args = new_command_args[1:] + args |
| 349 self.logger.warn('\n'.join(textwrap.wrap(( | 345 self.logger.warn('\n'.join(textwrap.wrap( |
| 350 'You are using a deprecated alias, "%(used_alias)s", for the ' | 346 ('You are using a deprecated alias, "%(used_alias)s", for the ' |
| 351 '"%(command_name)s" command. This will stop working on 9/9/2014. ' | 347 '"%(command_name)s" command. This will stop working on 9/9/2014. ' |
| 352 'Please use "%(command_name)s" with the appropriate sub-command in ' | 348 'Please use "%(command_name)s" with the appropriate sub-command in ' |
| 353 'the future. See "gsutil help %(command_name)s" for details.') % | 349 'the future. See "gsutil help %(command_name)s" for details.') % |
| 354 {'used_alias': self.command_alias_used, | 350 {'used_alias': self.command_alias_used, |
| 355 'command_name': self.command_name }))) | 351 'command_name': self.command_name}))) |
| 356 return args | 352 return args |
| 357 | 353 |
| 358 def __init__(self, command_runner, args, headers, debug, parallel_operations, | 354 def __init__(self, command_runner, args, headers, debug, parallel_operations, |
| 359 config_file_list, bucket_storage_uri_class, test_method=None, | 355 bucket_storage_uri_class, gsutil_api_class_map_factory, |
| 360 logging_filters=None, command_alias_used=None): | 356 test_method=None, logging_filters=None, |
| 361 """ | 357 command_alias_used=None): |
| 358 """Instantiates a Command. |
| 359 |
| 362 Args: | 360 Args: |
| 363 command_runner: CommandRunner (for commands built atop other commands). | 361 command_runner: CommandRunner (for commands built atop other commands). |
| 364 args: Command-line args (arg0 = actual arg, not command name ala bash). | 362 args: Command-line args (arg0 = actual arg, not command name ala bash). |
| 365 headers: Dictionary containing optional HTTP headers to pass to boto. | 363 headers: Dictionary containing optional HTTP headers to pass to boto. |
| 366 debug: Debug level to pass in to boto connection (range 0..3). | 364 debug: Debug level to pass in to boto connection (range 0..3). |
| 367 parallel_operations: Should command operations be executed in parallel? | 365 parallel_operations: Should command operations be executed in parallel? |
| 368 config_file_list: Config file list returned by GetBotoConfigFileList(). | |
| 369 bucket_storage_uri_class: Class to instantiate for cloud StorageUris. | 366 bucket_storage_uri_class: Class to instantiate for cloud StorageUris. |
| 370 Settable for testing/mocking. | 367 Settable for testing/mocking. |
| 368 gsutil_api_class_map_factory: Creates map of cloud storage interfaces. |
| 369 Settable for testing/mocking. |
| 371 test_method: Optional general purpose method for testing purposes. | 370 test_method: Optional general purpose method for testing purposes. |
| 372 Application and semantics of this method will vary by | 371 Application and semantics of this method will vary by |
| 373 command and test type. | 372 command and test type. |
| 374 logging_filters: Optional list of logging.Filters to apply to this | 373 logging_filters: Optional list of logging.Filters to apply to this |
| 375 command's logger. | 374 command's logger. |
| 376 command_alias_used: The alias that was actually used when running this | 375 command_alias_used: The alias that was actually used when running this |
| 377 command (as opposed to the "official" command name, | 376 command (as opposed to the "official" command name, |
| 378 which will always correspond to the file name). | 377 which will always correspond to the file name). |
| 379 | 378 |
| 380 Implementation note: subclasses shouldn't need to define an __init__ | 379 Implementation note: subclasses shouldn't need to define an __init__ |
| 381 method, and instead depend on the shared initialization that happens | 380 method, and instead depend on the shared initialization that happens |
| 382 here. If you do define an __init__ method in a subclass you'll need to | 381 here. If you do define an __init__ method in a subclass you'll need to |
| 383 explicitly call super().__init__(). But you're encouraged not to do this, | 382 explicitly call super().__init__(). But you're encouraged not to do this, |
| 384 because it will make changing the __init__ interface more painful. | 383 because it will make changing the __init__ interface more painful. |
| 385 """ | 384 """ |
| 386 # Save class values from constructor params. | 385 # Save class values from constructor params. |
| 387 self.command_runner = command_runner | 386 self.command_runner = command_runner |
| 388 self.unparsed_args = args | 387 self.unparsed_args = args |
| 389 self.headers = headers | 388 self.headers = headers |
| 390 self.debug = debug | 389 self.debug = debug |
| 391 self.parallel_operations = parallel_operations | 390 self.parallel_operations = parallel_operations |
| 392 self.config_file_list = config_file_list | |
| 393 self.bucket_storage_uri_class = bucket_storage_uri_class | 391 self.bucket_storage_uri_class = bucket_storage_uri_class |
| 392 self.gsutil_api_class_map_factory = gsutil_api_class_map_factory |
| 394 self.test_method = test_method | 393 self.test_method = test_method |
| 395 self.exclude_symlinks = False | 394 self.exclude_symlinks = False |
| 396 self.recursion_requested = False | 395 self.recursion_requested = False |
| 397 self.all_versions = False | 396 self.all_versions = False |
| 398 self.command_alias_used = command_alias_used | 397 self.command_alias_used = command_alias_used |
| 399 | 398 |
| 400 # Global instance of a threaded logger object. | 399 # Global instance of a threaded logger object. |
| 401 self.logger = CreateGsutilLogger(self.command_name) | 400 self.logger = CreateGsutilLogger(self.command_name) |
| 402 if logging_filters: | 401 if logging_filters: |
| 403 for filter in logging_filters: | 402 for log_filter in logging_filters: |
| 404 self.logger.addFilter(filter) | 403 self.logger.addFilter(log_filter) |
| 405 | 404 |
| 406 # Process sub-command instance specifications. | 405 if self.command_spec is None: |
| 407 # First, ensure subclass implementation sets all required keys. | 406 raise CommandException('"%s" command implementation is missing a ' |
| 408 for k in self.REQUIRED_SPEC_KEYS: | 407 'command_spec definition.' % self.command_name) |
| 409 if k not in self.command_spec or self.command_spec[k] is None: | |
| 410 raise CommandException('"%s" command implementation is missing %s ' | |
| 411 'specification' % (self.command_name, k)) | |
| 412 # Now override default command_spec with subclass-specified values. | |
| 413 tmp = self._default_command_spec | |
| 414 tmp.update(self.command_spec) | |
| 415 self.command_spec = tmp | |
| 416 del tmp | |
| 417 | |
| 418 # Make sure command provides a test specification. | |
| 419 if not self.test_steps: | |
| 420 # TODO: Uncomment following lines when test feature is ready. | |
| 421 #raise CommandException('"%s" command implementation is missing test ' | |
| 422 #'specification' % self.command_name) | |
| 423 pass | |
| 424 | 408 |
| 425 # Parse and validate args. | 409 # Parse and validate args. |
| 426 args = self._TranslateDeprecatedAliases(args) | 410 args = self._TranslateDeprecatedAliases(args) |
| 427 try: | 411 try: |
| 428 (self.sub_opts, self.args) = getopt.getopt( | 412 (self.sub_opts, self.args) = getopt.getopt( |
| 429 args, self.command_spec[SUPPORTED_SUB_ARGS]) | 413 args, self.command_spec.supported_sub_args, |
| 414 self.command_spec.supported_private_args or []) |
| 430 except GetoptError, e: | 415 except GetoptError, e: |
| 431 raise CommandException('%s for "%s" command.' % (e.msg, | 416 raise CommandException('%s for "%s" command.' % (e.msg, |
| 432 self.command_name)) | 417 self.command_name)) |
| 433 self.command_spec[URIS_START_ARG] = self._CalculateUrisStartArg() | 418 # Named tuple public functions start with _ |
| 434 | 419 # pylint: disable=protected-access |
| 435 if (len(self.args) < self.command_spec[MIN_ARGS] | 420 self.command_spec = self.command_spec._replace( |
| 436 or len(self.args) > self.command_spec[MAX_ARGS]): | 421 urls_start_arg=self._CalculateUrlsStartArg()) |
| 422 |
| 423 if (len(self.args) < self.command_spec.min_args |
| 424 or len(self.args) > self.command_spec.max_args): |
| 437 self._RaiseWrongNumberOfArgumentsException() | 425 self._RaiseWrongNumberOfArgumentsException() |
| 438 | 426 |
| 439 if not (self.command_name in | 427 if self.command_name not in self._commands_with_subcommands_and_subopts: |
| 440 self._commands_with_subcommands_and_subopts): | |
| 441 self.CheckArguments() | 428 self.CheckArguments() |
| 442 | 429 |
| 443 self.proj_id_handler = ProjectIdHandler() | 430 # Build the support and default maps from the command spec. |
| 444 self.suri_builder = StorageUriBuilder(debug, bucket_storage_uri_class) | 431 support_map = { |
| 432 'gs': self.command_spec.gs_api_support, |
| 433 's3': [ApiSelector.XML] |
| 434 } |
| 435 default_map = { |
| 436 'gs': self.command_spec.gs_default_api, |
| 437 's3': ApiSelector.XML |
| 438 } |
| 439 self.gsutil_api_map = GsutilApiMapFactory.GetApiMap( |
| 440 self.gsutil_api_class_map_factory, support_map, default_map) |
| 441 |
| 442 self.project_id = None |
| 443 self.gsutil_api = CloudApiDelegator( |
| 444 bucket_storage_uri_class, self.gsutil_api_map, |
| 445 self.logger, debug=self.debug) |
| 445 | 446 |
| 446 # Cross-platform path to run gsutil binary. | 447 # Cross-platform path to run gsutil binary. |
| 447 self.gsutil_cmd = '' | 448 self.gsutil_cmd = '' |
| 448 # Cross-platform list containing gsutil path for use with subprocess. | |
| 449 self.gsutil_exec_list = [] | |
| 450 # If running on Windows, invoke python interpreter explicitly. | 449 # If running on Windows, invoke python interpreter explicitly. |
| 451 if gslib.util.IS_WINDOWS: | 450 if gslib.util.IS_WINDOWS: |
| 452 self.gsutil_cmd += 'python ' | 451 self.gsutil_cmd += 'python ' |
| 453 self.gsutil_exec_list += ['python'] | |
| 454 # Add full path to gsutil to make sure we test the correct version. | 452 # Add full path to gsutil to make sure we test the correct version. |
| 455 self.gsutil_path = gslib.GSUTIL_PATH | 453 self.gsutil_path = gslib.GSUTIL_PATH |
| 456 self.gsutil_cmd += self.gsutil_path | 454 self.gsutil_cmd += self.gsutil_path |
| 457 self.gsutil_exec_list += [self.gsutil_path] | |
| 458 | 455 |
| 459 # We're treating recursion_requested like it's used by all commands, but | 456 # We're treating recursion_requested like it's used by all commands, but |
| 460 # only some of the commands accept the -R option. | 457 # only some of the commands accept the -R option. |
| 461 if self.sub_opts: | 458 if self.sub_opts: |
| 462 for o, unused_a in self.sub_opts: | 459 for o, unused_a in self.sub_opts: |
| 463 if o == '-r' or o == '-R': | 460 if o == '-r' or o == '-R': |
| 464 self.recursion_requested = True | 461 self.recursion_requested = True |
| 465 break | 462 break |
| 466 | 463 |
| 467 self.multiprocessing_is_available = MultiprocessingIsAvailable()[0] | 464 self.multiprocessing_is_available = MultiprocessingIsAvailable()[0] |
| 468 | 465 |
| 469 def _RaiseWrongNumberOfArgumentsException(self): | 466 def _RaiseWrongNumberOfArgumentsException(self): |
| 470 """Raise an exception indicating that the wrong number of arguments was | 467 """Raises exception for wrong number of arguments supplied to command.""" |
| 471 provided for this command. | 468 if len(self.args) > self.command_spec.max_args: |
| 472 """ | |
| 473 if len(self.args) > self.command_spec[MAX_ARGS]: | |
| 474 message = ('The %s command accepts at most %d arguments.' % | 469 message = ('The %s command accepts at most %d arguments.' % |
| 475 (self.command_name, self.command_spec[MAX_ARGS])) | 470 (self.command_name, self.command_spec.max_args)) |
| 476 elif len(self.args) < self.command_spec[MIN_ARGS]: | 471 elif len(self.args) < self.command_spec.min_args: |
| 477 message = ('The %s command requires at least %d arguments.' % | 472 message = ('The %s command requires at least %d arguments.' % |
| 478 (self.command_name, self.command_spec[MIN_ARGS])) | 473 (self.command_name, self.command_spec.min_args)) |
| 479 raise CommandException(message) | 474 raise CommandException(message) |
| 480 | 475 |
| 481 def CheckArguments(self): | 476 def CheckArguments(self): |
| 482 """Checks that the arguments provided on the command line fit the | 477 """Checks that command line arguments match the command_spec. |
| 483 expectations of the command_spec. Any commands in | |
| 484 self._commands_with_subcommands_and_subopts are responsible for calling | |
| 485 this method after handling initial parsing of their arguments. | |
| 486 This prevents commands with sub-commands as well as options from breaking | |
| 487 the parsing of getopt. | |
| 488 | 478 |
| 489 TODO: Provide a function to parse commands and sub-commands more | 479 Any commands in self._commands_with_subcommands_and_subopts are responsible |
| 490 intelligently once we stop allowing the deprecated command versions. | 480 for calling this method after handling initial parsing of their arguments. |
| 481 This prevents commands with sub-commands as well as options from breaking |
| 482 the parsing of getopt. |
| 483 |
| 484 TODO: Provide a function to parse commands and sub-commands more |
| 485 intelligently once we stop allowing the deprecated command versions. |
| 486 |
| 487 Raises: |
| 488 CommandException if the arguments don't match. |
| 491 """ | 489 """ |
| 492 | 490 |
| 493 if (not self.command_spec[FILE_URIS_OK] | 491 if (not self.command_spec.file_url_ok |
| 494 and self.HaveFileUris(self.args[self.command_spec[URIS_START_ARG]:])): | 492 and HaveFileUrls(self.args[self.command_spec.urls_start_arg:])): |
| 495 raise CommandException('"%s" command does not support "file://" URIs. ' | 493 raise CommandException('"%s" command does not support "file://" URLs. ' |
| 496 'Did you mean to use a gs:// URI?' % | 494 'Did you mean to use a gs:// URL?' % |
| 497 self.command_name) | 495 self.command_name) |
| 498 if (not self.command_spec[PROVIDER_URIS_OK] | 496 if (not self.command_spec.provider_url_ok |
| 499 and self._HaveProviderUris( | 497 and HaveProviderUrls(self.args[self.command_spec.urls_start_arg:])): |
| 500 self.args[self.command_spec[URIS_START_ARG]:])): | |
| 501 raise CommandException('"%s" command does not support provider-only ' | 498 raise CommandException('"%s" command does not support provider-only ' |
| 502 'URIs.' % self.command_name) | 499 'URLs.' % self.command_name) |
| 503 | 500 |
| 504 def WildcardIterator(self, uri_or_str, all_versions=False): | 501 def WildcardIterator(self, url_string, all_versions=False): |
| 505 """ | 502 """Helper to instantiate gslib.WildcardIterator. |
| 506 Helper to instantiate gslib.WildcardIterator. Args are same as | 503 |
| 507 gslib.WildcardIterator interface, but this method fills in most of the | 504 Args are same as gslib.WildcardIterator interface, but this method fills in |
| 508 values from instance state. | 505 most of the values from instance state. |
| 509 | 506 |
| 510 Args: | 507 Args: |
| 511 uri_or_str: StorageUri or URI string naming wildcard objects to iterate. | 508 url_string: URL string naming wildcard objects to iterate. |
| 509 all_versions: If true, the iterator yields all versions of objects |
| 510 matching the wildcard. If false, yields just the live |
| 511 object version. |
| 512 |
| 513 Returns: |
| 514 WildcardIterator for use by caller. |
| 512 """ | 515 """ |
| 513 return wildcard_iterator.wildcard_iterator( | 516 return CreateWildcardIterator( |
| 514 uri_or_str, self.proj_id_handler, | 517 url_string, self.gsutil_api, all_versions=all_versions, |
| 515 bucket_storage_uri_class=self.bucket_storage_uri_class, | 518 debug=self.debug, project_id=self.project_id) |
| 516 all_versions=all_versions, | |
| 517 headers=self.headers, debug=self.debug) | |
| 518 | 519 |
| 519 def RunCommand(self): | 520 def RunCommand(self): |
| 520 """Abstract function in base class. Subclasses must implement this. The | 521 """Abstract function in base class. Subclasses must implement this. |
| 521 return value of this function will be used as the exit status of the | 522 |
| 523 The return value of this function will be used as the exit status of the |
| 522 process, so subclass commands should return an integer exit code (0 for | 524 process, so subclass commands should return an integer exit code (0 for |
| 523 success, a value in [1,255] for failure). | 525 success, a value in [1,255] for failure). |
| 524 """ | 526 """ |
| 525 raise CommandException('Command %s is missing its RunCommand() ' | 527 raise CommandException('Command %s is missing its RunCommand() ' |
| 526 'implementation' % self.command_name) | 528 'implementation' % self.command_name) |
| 527 | 529 |
| 528 ############################################################ | 530 ############################################################ |
| 529 # Shared helper functions that depend on base class state. # | 531 # Shared helper functions that depend on base class state. # |
| 530 ############################################################ | 532 ############################################################ |
| 531 | 533 |
| 532 def UrisAreForSingleProvider(self, uri_args): | 534 def ApplyAclFunc(self, acl_func, acl_excep_handler, url_strs): |
| 533 """Tests whether the uris are all for a single provider. | 535 """Sets the standard or default object ACL depending on self.command_name. |
| 534 | 536 |
| 535 Returns: a StorageUri for one of the uris on success, None on failure. | 537 Args: |
| 538 acl_func: ACL function to be passed to Apply. |
| 539 acl_excep_handler: ACL exception handler to be passed to Apply. |
| 540 url_strs: URL strings on which to set ACL. |
| 541 |
| 542 Raises: |
| 543 CommandException if an ACL could not be set. |
| 536 """ | 544 """ |
| 537 provider = None | 545 multi_threaded_url_args = [] |
| 538 uri = None | 546 # Handle bucket ACL setting operations single-threaded, because |
| 539 for uri_str in uri_args: | 547 # our threading machinery currently assumes it's working with objects |
| 540 # validate=False because we allow wildcard uris. | 548 # (name_expansion_iterator), and normally we wouldn't expect users to need |
| 541 uri = boto.storage_uri( | 549 # to set ACLs on huge numbers of buckets at once anyway. |
| 542 uri_str, debug=self.debug, validate=False, | 550 for url_str in url_strs: |
| 543 bucket_storage_uri_class=self.bucket_storage_uri_class) | 551 url = StorageUrlFromString(url_str) |
| 544 if not provider: | 552 if url.IsCloudUrl() and url.IsBucket(): |
| 545 provider = uri.scheme | 553 if self.recursion_requested: |
| 546 elif uri.scheme != provider: | 554 # If user specified -R option, convert any bucket args to bucket |
| 547 return None | 555 # wildcards (e.g., gs://bucket/*), to prevent the operation from |
| 548 return uri | 556 # being applied to the buckets themselves. |
| 549 | 557 url.object_name = '*' |
| 550 def _SetAclFunc(self, name_expansion_result): | 558 multi_threaded_url_args.append(url.url_string) |
| 551 exp_src_uri = self.suri_builder.StorageUri( | 559 else: |
| 552 name_expansion_result.GetExpandedUriStr()) | 560 # Convert to a NameExpansionResult so we can re-use the threaded |
| 553 # We don't do bucket operations multi-threaded (see comment below). | 561 # function for the single-threaded implementation. RefType is unused. |
| 554 assert self.command_name != 'defacl' | 562 for blr in self.WildcardIterator(url.url_string).IterBuckets( |
| 555 self.logger.info('Setting ACL on %s...' % | 563 bucket_fields=['id']): |
| 556 name_expansion_result.expanded_uri_str) | 564 name_expansion_for_url = NameExpansionResult( |
| 557 try: | 565 url, False, False, blr.storage_url) |
| 558 if self.canned: | 566 acl_func(self, name_expansion_for_url) |
| 559 exp_src_uri.set_acl(self.acl_arg, exp_src_uri.object_name, False, | |
| 560 self.headers) | |
| 561 else: | 567 else: |
| 562 exp_src_uri.set_xml_acl(self.acl_arg, exp_src_uri.object_name, False, | 568 multi_threaded_url_args.append(url_str) |
| 563 self.headers) | 569 |
| 564 except GSResponseError as e: | 570 if len(multi_threaded_url_args) >= 1: |
| 565 if self.continue_on_error: | 571 name_expansion_iterator = NameExpansionIterator( |
| 566 exc_name, message, detail = util.ParseErrorDetail(e) | 572 self.command_name, self.debug, |
| 567 self.everything_set_okay = False | 573 self.logger, self.gsutil_api, |
| 568 sys.stderr.write(util.FormatErrorMessage( | 574 multi_threaded_url_args, self.recursion_requested, |
| 569 exc_name, e.status, e.code, e.reason, message, detail)) | 575 all_versions=self.all_versions, |
| 570 else: | 576 continue_on_error=self.continue_on_error or self.parallel_operations) |
| 577 |
| 578 # Perform requests in parallel (-m) mode, if requested, using |
| 579 # configured number of parallel processes and threads. Otherwise, |
| 580 # perform requests with sequential function calls in current process. |
| 581 self.Apply(acl_func, name_expansion_iterator, acl_excep_handler, |
| 582 fail_on_error=not self.continue_on_error) |
| 583 |
| 584 if not self.everything_set_okay and not self.continue_on_error: |
| 585 raise CommandException('ACLs for some objects could not be set.') |
| 586 |
| 587 def SetAclFunc(self, name_expansion_result, thread_state=None): |
| 588 """Sets the object ACL for the name_expansion_result provided. |
| 589 |
| 590 Args: |
| 591 name_expansion_result: NameExpansionResult describing the target object. |
| 592 thread_state: If present, use this gsutil Cloud API instance for the set. |
| 593 """ |
| 594 if thread_state: |
| 595 assert not self.def_acl |
| 596 gsutil_api = thread_state |
| 597 else: |
| 598 gsutil_api = self.gsutil_api |
| 599 op_string = 'default object ACL' if self.def_acl else 'ACL' |
| 600 url = name_expansion_result.expanded_storage_url |
| 601 self.logger.info('Setting %s on %s...', op_string, url) |
| 602 if ((gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML |
| 603 and url.scheme != 'gs') or self.canned): |
| 604 # If we are using canned ACLs or interacting with a non-google ACL |
| 605 # model, we need to use the XML passthrough. acl_arg should either |
| 606 # be a canned ACL or an XML ACL. |
| 607 try: |
| 608 # No canned ACL support in JSON, force XML API to be used. |
| 609 orig_prefer_api = gsutil_api.prefer_api |
| 610 gsutil_api.prefer_api = ApiSelector.XML |
| 611 gsutil_api.XmlPassThroughSetAcl( |
| 612 self.acl_arg, url, canned=self.canned, |
| 613 def_obj_acl=self.def_acl, provider=url.scheme) |
| 614 gsutil_api.prefer_api = orig_prefer_api |
| 615 except ServiceException as e: |
| 616 if self.continue_on_error: |
| 617 self.everything_set_okay = False |
| 618 self.logger.error(e) |
| 619 else: |
| 620 raise |
| 621 else: # Normal Cloud API path. ACL is a JSON ACL. |
| 622 try: |
| 623 if url.IsBucket(): |
| 624 if self.def_acl: |
| 625 def_obj_acl = AclTranslation.JsonToMessage( |
| 626 self.acl_arg, apitools_messages.ObjectAccessControl) |
| 627 bucket_metadata = apitools_messages.Bucket( |
| 628 defaultObjectAcl=def_obj_acl) |
| 629 gsutil_api.PatchBucket(url.bucket_name, bucket_metadata, |
| 630 provider=url.scheme, fields=['id']) |
| 631 else: |
| 632 bucket_acl = AclTranslation.JsonToMessage( |
| 633 self.acl_arg, apitools_messages.BucketAccessControl) |
| 634 bucket_metadata = apitools_messages.Bucket(acl=bucket_acl) |
| 635 gsutil_api.PatchBucket(url.bucket_name, bucket_metadata, |
| 636 provider=url.scheme, fields=['id']) |
| 637 else: # url.IsObject() |
| 638 object_acl = AclTranslation.JsonToMessage( |
| 639 self.acl_arg, apitools_messages.ObjectAccessControl) |
| 640 object_metadata = apitools_messages.Object(acl=object_acl) |
| 641 gsutil_api.PatchObjectMetadata(url.bucket_name, url.object_name, |
| 642 object_metadata, provider=url.scheme, |
| 643 generation=url.generation) |
| 644 except ArgumentException, e: |
| 645 raise |
| 646 except ServiceException, e: |
| 571 raise | 647 raise |
| 572 | 648 |
| 573 def SetAclCommandHelper(self): | 649 def SetAclCommandHelper(self, acl_func, acl_excep_handler): |
| 650 """Sets ACLs on the self.args using the passed-in acl function. |
| 651 |
| 652 Args: |
| 653 acl_func: ACL function to be passed to Apply. |
| 654 acl_excep_handler: ACL exception handler to be passed to Apply. |
| 574 """ | 655 """ |
| 575 Common logic for setting ACLs. Sets the standard ACL or the default | 656 acl_arg = self.args[0] |
| 576 object ACL depending on self.command_name. | 657 url_args = self.args[1:] |
| 577 """ | 658 # Disallow multi-provider setacl requests, because there are differences in |
| 578 | |
| 579 self.acl_arg = self.args[0] | |
| 580 uri_args = self.args[1:] | |
| 581 # Disallow multi-provider acl set requests, because there are differences in | |
| 582 # the ACL models. | 659 # the ACL models. |
| 583 storage_uri = self.UrisAreForSingleProvider(uri_args) | 660 if not UrlsAreForSingleProvider(url_args): |
| 584 if not storage_uri: | |
| 585 raise CommandException('"%s" command spanning providers not allowed.' % | 661 raise CommandException('"%s" command spanning providers not allowed.' % |
| 586 self.command_name) | 662 self.command_name) |
| 587 | 663 |
| 588 # Determine whether acl_arg names a file containing XML ACL text vs. the | 664 # Determine whether acl_arg names a file containing XML ACL text vs. the |
| 589 # string name of a canned ACL. | 665 # string name of a canned ACL. |
| 590 if os.path.isfile(self.acl_arg): | 666 if os.path.isfile(acl_arg): |
| 591 with codecs.open(self.acl_arg, 'r', 'utf-8') as f: | 667 with codecs.open(acl_arg, 'r', UTF8) as f: |
| 592 self.acl_arg = f.read() | 668 acl_arg = f.read() |
| 593 self.canned = False | 669 self.canned = False |
| 594 else: | 670 else: |
| 595 # No file exists, so expect a canned ACL string. | 671 # No file exists, so expect a canned ACL string. |
| 672 # Canned ACLs are not supported in JSON and we need to use the XML API |
| 673 # to set them. |
| 674 # validate=False because we allow wildcard urls. |
| 675 storage_uri = boto.storage_uri( |
| 676 url_args[0], debug=self.debug, validate=False, |
| 677 bucket_storage_uri_class=self.bucket_storage_uri_class) |
| 678 |
| 596 canned_acls = storage_uri.canned_acls() | 679 canned_acls = storage_uri.canned_acls() |
| 597 if self.acl_arg not in canned_acls: | 680 if acl_arg not in canned_acls: |
| 598 raise CommandException('Invalid canned ACL "%s".' % self.acl_arg) | 681 raise CommandException('Invalid canned ACL "%s".' % acl_arg) |
| 599 self.canned = True | 682 self.canned = True |
| 600 | 683 |
| 601 # Used to track if any ACLs failed to be set. | 684 # Used to track if any ACLs failed to be set. |
| 602 self.everything_set_okay = True | 685 self.everything_set_okay = True |
| 686 self.acl_arg = acl_arg |
| 603 | 687 |
| 604 # If user specified -R option, convert any bucket args to bucket wildcards | 688 self.ApplyAclFunc(acl_func, acl_excep_handler, url_args) |
| 605 # (e.g., gs://bucket/*), to prevent the operation from being applied to | |
| 606 # the buckets themselves. | |
| 607 if self.recursion_requested: | |
| 608 for i in range(len(uri_args)): | |
| 609 uri = self.suri_builder.StorageUri(uri_args[i]) | |
| 610 if uri.names_bucket(): | |
| 611 uri_args[i] = uri.clone_replace_name('*').uri | |
| 612 else: | |
| 613 # Handle bucket ACL setting operations single-threaded, because | |
| 614 # our threading machinery currently assumes it's working with objects | |
| 615 # (name_expansion_iterator), and normally we wouldn't expect users to need | |
| 616 # to set ACLs on huge numbers of buckets at once anyway. | |
| 617 for i in range(len(uri_args)): | |
| 618 uri_str = uri_args[i] | |
| 619 if self.suri_builder.StorageUri(uri_str).names_bucket(): | |
| 620 self._RunSingleThreadedSetAcl(self.acl_arg, uri_args) | |
| 621 return | |
| 622 | |
| 623 name_expansion_iterator = NameExpansionIterator( | |
| 624 self.command_name, self.proj_id_handler, self.headers, self.debug, | |
| 625 self.logger, self.bucket_storage_uri_class, uri_args, | |
| 626 self.recursion_requested, self.recursion_requested, | |
| 627 all_versions=self.all_versions) | |
| 628 # Perform requests in parallel (-m) mode, if requested, using | |
| 629 # configured number of parallel processes and threads. Otherwise, | |
| 630 # perform requests with sequential function calls in current process. | |
| 631 self.Apply(_SetAclFuncWrapper, name_expansion_iterator, | |
| 632 _SetAclExceptionHandler) | |
| 633 | |
| 634 if not self.everything_set_okay and not self.continue_on_error: | 689 if not self.everything_set_okay and not self.continue_on_error: |
| 635 raise CommandException('ACLs for some objects could not be set.') | 690 raise CommandException('ACLs for some objects could not be set.') |
| 636 | 691 |
| 637 def _RunSingleThreadedSetAcl(self, acl_arg, uri_args): | 692 def _WarnServiceAccounts(self): |
| 638 some_matched = False | 693 """Warns service account users who have received an AccessDenied error. |
| 639 for uri_str in uri_args: | |
| 640 for blr in self.WildcardIterator(uri_str): | |
| 641 if blr.HasPrefix(): | |
| 642 continue | |
| 643 some_matched = True | |
| 644 uri = blr.GetUri() | |
| 645 if self.command_name == 'defacl': | |
| 646 self.logger.info('Setting default object ACL on %s...', uri) | |
| 647 if self.canned: | |
| 648 uri.set_def_acl(acl_arg, uri.object_name, False, self.headers) | |
| 649 else: | |
| 650 uri.set_def_xml_acl(acl_arg, False, self.headers) | |
| 651 else: | |
| 652 self.logger.info('Setting ACL on %s...', uri) | |
| 653 if self.canned: | |
| 654 uri.set_acl(acl_arg, uri.object_name, False, self.headers) | |
| 655 else: | |
| 656 uri.set_xml_acl(acl_arg, uri.object_name, False, self.headers) | |
| 657 if not some_matched: | |
| 658 raise CommandException('No URIs matched') | |
| 659 | 694 |
| 660 def _WarnServiceAccounts(self): | 695 When one of the metadata-related commands fails due to AccessDenied, user |
| 661 """Warns service account users who have received an AccessDenied error for | 696 must ensure that they are listed as an Owner in the API console. |
| 662 one of the metadata-related commands to make sure that they are listed as | 697 """ |
| 663 Owners in the API console.""" | 698 # Import this here so that the value will be set first in |
| 664 | 699 # gcs_oauth2_boto_plugin. |
| 665 # Import this here so that the value will be set first in oauth2_plugin. | 700 # pylint: disable=g-import-not-at-top |
| 666 from gslib.third_party.oauth2_plugin.oauth2_plugin import IS_SERVICE_ACCOUNT | 701 from gcs_oauth2_boto_plugin.oauth2_plugin import IS_SERVICE_ACCOUNT |
| 667 | 702 |
| 668 if IS_SERVICE_ACCOUNT: | 703 if IS_SERVICE_ACCOUNT: |
| 669 # This method is only called when canned ACLs are used, so the warning | 704 # This method is only called when canned ACLs are used, so the warning |
| 670 # definitely applies. | 705 # definitely applies. |
| 671 self.logger.warning('\n'.join(textwrap.wrap( | 706 self.logger.warning('\n'.join(textwrap.wrap( |
| 672 'It appears that your service account has been denied access while ' | 707 'It appears that your service account has been denied access while ' |
| 673 'attempting to perform a metadata operation. If you believe that you ' | 708 'attempting to perform a metadata operation. If you believe that you ' |
| 674 'should have access to this metadata (i.e., if it is associated with ' | 709 'should have access to this metadata (i.e., if it is associated with ' |
| 675 'your account), please make sure that your service account''s email ' | 710 'your account), please make sure that your service account''s email ' |
| 676 'address is listed as an Owner in the Team tab of the API console. ' | 711 'address is listed as an Owner in the Team tab of the API console. ' |
| 677 'See "gsutil help creds" for further information.\n'))) | 712 'See "gsutil help creds" for further information.\n'))) |
| 678 | 713 |
| 679 def GetAclCommandHelper(self): | 714 def GetAndPrintAcl(self, url_str): |
| 680 """Common logic for getting ACLs. Gets the standard ACL or the default | 715 """Prints the standard or default object ACL depending on self.command_name. |
| 681 object ACL depending on self.command_name.""" | |
| 682 | |
| 683 # Resolve to just one object. | |
| 684 # Handle wildcard-less URI specially in case this is a version-specific | |
| 685 # URI, because WildcardIterator().IterUris() would lose the versioning info. | |
| 686 if not ContainsWildcard(self.args[0]): | |
| 687 uri = self.suri_builder.StorageUri(self.args[0]) | |
| 688 else: | |
| 689 uris = list(self.WildcardIterator(self.args[0]).IterUris()) | |
| 690 if len(uris) == 0: | |
| 691 raise CommandException('No URIs matched') | |
| 692 if len(uris) != 1: | |
| 693 raise CommandException('%s matched more than one URI, which is not ' | |
| 694 'allowed by the %s command' % (self.args[0], self.command_name)) | |
| 695 uri = uris[0] | |
| 696 if not uri.names_bucket() and not uri.names_object(): | |
| 697 raise CommandException('"%s" command must specify a bucket or ' | |
| 698 'object.' % self.command_name) | |
| 699 if self.command_name == 'defacl': | |
| 700 acl = uri.get_def_acl(False, self.headers) | |
| 701 else: | |
| 702 acl = uri.get_acl(False, self.headers) | |
| 703 # Pretty-print the XML to make it more easily human editable. | |
| 704 parsed_xml = xml.dom.minidom.parseString(acl.to_xml().encode('utf-8')) | |
| 705 print parsed_xml.toprettyxml(indent=' ').encode('utf-8') | |
| 706 | |
| 707 def GetXmlSubresource(self, subresource, uri_arg): | |
| 708 """Print an xml subresource, e.g. logging, for a bucket/object. | |
| 709 | 716 |
| 710 Args: | 717 Args: |
| 711 subresource: The subresource name. | 718 url_str: URL string to get ACL for. |
| 712 uri_arg: URI for the bucket/object. Wildcards will be expanded. | 719 """ |
| 720 blr = self.GetAclCommandBucketListingReference(url_str) |
| 721 url = StorageUrlFromString(url_str) |
| 722 if (self.gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML |
| 723 and url.scheme != 'gs'): |
| 724 # Need to use XML passthrough. |
| 725 try: |
| 726 acl = self.gsutil_api.XmlPassThroughGetAcl( |
| 727 url, def_obj_acl=self.def_acl, provider=url.scheme) |
| 728 print acl.to_xml() |
| 729 except AccessDeniedException, _: |
| 730 self._WarnServiceAccounts() |
| 731 raise |
| 732 else: |
| 733 if self.command_name == 'defacl': |
| 734 acl = blr.root_object.defaultObjectAcl |
| 735 if not acl: |
| 736 self.logger.warn( |
| 737 'No default object ACL present for %s. This could occur if ' |
| 738 'the default object ACL is private, in which case objects ' |
| 739 'created in this bucket will be readable only by their ' |
| 740 'creators. It could also mean you do not have OWNER permission ' |
| 741 'on %s and therefore do not have permission to read the ' |
| 742 'default object ACL.', url_str, url_str) |
| 743 else: |
| 744 acl = blr.root_object.acl |
| 745 if not acl: |
| 746 self._WarnServiceAccounts() |
| 747 raise AccessDeniedException('Access denied. Please ensure you have ' |
| 748 'OWNER permission on %s.' % url_str) |
| 749 print AclTranslation.JsonFromMessage(acl) |
| 750 |
| 751 def GetAclCommandBucketListingReference(self, url_str): |
| 752 """Gets a single bucket listing reference for an acl get command. |
| 753 |
| 754 Args: |
| 755 url_str: URL string to get the bucket listing reference for. |
| 756 |
| 757 Returns: |
| 758 BucketListingReference for the URL string. |
| 713 | 759 |
| 714 Raises: | 760 Raises: |
| 715 CommandException: if errors encountered. | 761 CommandException if string did not result in exactly one reference. |
| 716 """ | 762 """ |
| 717 # Wildcarding is allowed but must resolve to just one bucket. | 763 # We're guaranteed by caller that we have the appropriate type of url |
| 718 uris = list(self.WildcardIterator(uri_arg).IterUris()) | 764 # string for the call (ex. we will never be called with an object string |
| 719 if len(uris) != 1: | 765 # by getdefacl) |
| 720 raise CommandException('Wildcards must resolve to exactly one item for ' | 766 wildcard_url = StorageUrlFromString(url_str) |
| 721 'get %s' % subresource) | 767 if wildcard_url.IsObject(): |
| 722 uri = uris[0] | 768 plurality_iter = PluralityCheckableIterator( |
| 723 xml_str = uri.get_subresource(subresource, False, self.headers) | 769 self.WildcardIterator(url_str).IterObjects( |
| 724 # Pretty-print the XML to make it more easily human editable. | 770 bucket_listing_fields=['acl'])) |
| 725 parsed_xml = xml.dom.minidom.parseString(xml_str.encode('utf-8')) | 771 else: |
| 726 print parsed_xml.toprettyxml(indent=' ') | 772 # Bucket or provider. We call IterBuckets explicitly here to ensure that |
| 773 # the root object is populated with the acl. |
| 774 if self.command_name == 'defacl': |
| 775 bucket_fields = ['defaultObjectAcl'] |
| 776 else: |
| 777 bucket_fields = ['acl'] |
| 778 plurality_iter = PluralityCheckableIterator( |
| 779 self.WildcardIterator(url_str).IterBuckets( |
| 780 bucket_fields=bucket_fields)) |
| 781 if plurality_iter.IsEmpty(): |
| 782 raise CommandException('No URLs matched') |
| 783 if plurality_iter.HasPlurality(): |
| 784 raise CommandException( |
| 785 '%s matched more than one URL, which is not allowed by the %s ' |
| 786 'command' % (url_str, self.command_name)) |
| 787 return list(plurality_iter)[0] |
| 727 | 788 |
| 728 def _HandleMultiProcessingControlC(self, signal_num, cur_stack_frame): | 789 def _HandleMultiProcessingControlC(self, unused_signal_num, |
| 729 """Called when user hits ^C during a multi-processing/multi-threaded | 790 unused_cur_stack_frame): |
| 730 request, so we can kill the subprocesses.""" | 791 """Called when user hits ^C during a multi-process/multi-thread request. |
| 792 |
| 793 Kills subprocesses. |
| 794 |
| 795 Args: |
| 796 unused_signal_num: signal generated by ^C. |
| 797 unused_cur_stack_frame: Current stack frame. |
| 798 """ |
| 731 # Note: This only works under Linux/MacOS. See | 799 # Note: This only works under Linux/MacOS. See |
| 732 # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details | 800 # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details |
| 733 # about why making it work correctly across OS's is harder and still open. | 801 # about why making it work correctly across OS's is harder and still open. |
| 734 ShutDownGsutil() | 802 ShutDownGsutil() |
| 735 sys.stderr.write('Caught ^C - exiting\n') | 803 sys.stderr.write('Caught ^C - exiting\n') |
| 736 # Simply calling sys.exit(1) doesn't work - see above bug for details. | 804 # Simply calling sys.exit(1) doesn't work - see above bug for details. |
| 737 KillProcess(os.getpid()) | 805 KillProcess(os.getpid()) |
| 738 | 806 |
| 739 def HaveFileUris(self, args_to_check): | 807 def GetSingleBucketUrlFromArg(self, arg, bucket_fields=None): |
| 740 """Checks whether args_to_check contain any file URIs. | 808 """Gets a single bucket URL based on the command arguments. |
| 741 | 809 |
| 742 Args: | 810 Args: |
| 743 args_to_check: Command-line argument subset to check. | 811 arg: String argument to get bucket URL for. |
| 812 bucket_fields: Fields to populate for the bucket. |
| 744 | 813 |
| 745 Returns: | 814 Returns: |
| 746 True if args_to_check contains any file URIs. | 815 (StorageUrl referring to a single bucket, Bucket metadata). |
| 816 |
| 817 Raises: |
| 818 CommandException if args did not match exactly one bucket. |
| 747 """ | 819 """ |
| 748 for uri_str in args_to_check: | 820 plurality_checkable_iterator = self.GetBucketUrlIterFromArg( |
| 749 if uri_str.lower().startswith('file://') or uri_str.find(':') == -1: | 821 arg, bucket_fields=bucket_fields) |
| 750 return True | 822 if plurality_checkable_iterator.HasPlurality(): |
| 751 return False | 823 raise CommandException( |
| 824 '%s matched more than one URL, which is not\n' |
| 825 'allowed by the %s command' % (arg, self.command_name)) |
| 826 blr = list(plurality_checkable_iterator)[0] |
| 827 return StorageUrlFromString(blr.url_string), blr.root_object |
| 828 |
| 829 def GetBucketUrlIterFromArg(self, arg, bucket_fields=None): |
| 830 """Gets a single bucket URL based on the command arguments. |
| 831 |
| 832 Args: |
| 833 arg: String argument to iterate over. |
| 834 bucket_fields: Fields to populate for the bucket. |
| 835 |
| 836 Returns: |
| 837 PluralityCheckableIterator over buckets. |
| 838 |
| 839 Raises: |
| 840 CommandException if iterator matched no buckets. |
| 841 """ |
| 842 arg_url = StorageUrlFromString(arg) |
| 843 if not arg_url.IsCloudUrl() or arg_url.IsObject(): |
| 844 raise CommandException('"%s" command must specify a bucket' % |
| 845 self.command_name) |
| 846 |
| 847 plurality_checkable_iterator = PluralityCheckableIterator( |
| 848 self.WildcardIterator(arg).IterBuckets( |
| 849 bucket_fields=bucket_fields)) |
| 850 if plurality_checkable_iterator.IsEmpty(): |
| 851 raise CommandException('No URLs matched') |
| 852 return plurality_checkable_iterator |
| 752 | 853 |
| 753 ###################### | 854 ###################### |
| 754 # Private functions. # | 855 # Private functions. # |
| 755 ###################### | 856 ###################### |
| 756 | 857 |
| 757 def _HaveProviderUris(self, args_to_check): | |
| 758 """Checks whether args_to_check contains any provider URIs (like 'gs://'). | |
| 759 | |
| 760 Args: | |
| 761 args_to_check: Command-line argument subset to check. | |
| 762 | |
| 763 Returns: | |
| 764 True if args_to_check contains any provider URIs. | |
| 765 """ | |
| 766 for uri_str in args_to_check: | |
| 767 if re.match('^[a-z]+://$', uri_str): | |
| 768 return True | |
| 769 return False | |
| 770 | |
| 771 def _ResetConnectionPool(self): | 858 def _ResetConnectionPool(self): |
| 772 # Each OS process needs to establish its own set of connections to | 859 # Each OS process needs to establish its own set of connections to |
| 773 # the server to avoid writes from different OS processes interleaving | 860 # the server to avoid writes from different OS processes interleaving |
| 774 # onto the same socket (and garbling the underlying SSL session). | 861 # onto the same socket (and garbling the underlying SSL session). |
| 775 # We ensure each process gets its own set of connections here by | 862 # We ensure each process gets its own set of connections here by |
| 776 # closing all connections in the storage provider connection pool. | 863 # closing all connections in the storage provider connection pool. |
| 777 connection_pool = StorageUri.provider_pool | 864 connection_pool = StorageUri.provider_pool |
| 778 if connection_pool: | 865 if connection_pool: |
| 779 for i in connection_pool: | 866 for i in connection_pool: |
| 780 connection_pool[i].connection.close() | 867 connection_pool[i].connection.close() |
| 781 | 868 |
| 782 def _GetProcessAndThreadCount(self, process_count, thread_count, | 869 def _GetProcessAndThreadCount(self, process_count, thread_count, |
| 783 parallel_operations_override): | 870 parallel_operations_override): |
| 784 """ | 871 """Determines the values of process_count and thread_count. |
| 785 Determines the values of process_count and thread_count that we should | 872 |
| 786 actually use. If we're not performing operations in parallel, then ignore | 873 These values are used for parallel operations. |
| 874 If we're not performing operations in parallel, then ignore |
| 787 existing values and use process_count = thread_count = 1. | 875 existing values and use process_count = thread_count = 1. |
| 788 | 876 |
| 789 Args: | 877 Args: |
| 790 process_count: A positive integer or None. In the latter case, we read | 878 process_count: A positive integer or None. In the latter case, we read |
| 791 the value from the .boto config file. | 879 the value from the .boto config file. |
| 792 thread_count: A positive integer or None. In the latter case, we read | 880 thread_count: A positive integer or None. In the latter case, we read |
| 793 the value from the .boto config file. | 881 the value from the .boto config file. |
| 794 parallel_operations_override: Used to override self.parallel_operations. | 882 parallel_operations_override: Used to override self.parallel_operations. |
| 795 This allows the caller to safely override | 883 This allows the caller to safely override |
| 796 the top-level flag for a single call. | 884 the top-level flag for a single call. |
| (...skipping 18 matching lines...) Expand all Loading... |
| 815 gslib.commands.config.DEFAULT_PARALLEL_THREAD_COUNT) | 903 gslib.commands.config.DEFAULT_PARALLEL_THREAD_COUNT) |
| 816 if thread_count < 1: | 904 if thread_count < 1: |
| 817 raise CommandException('Invalid parallel_thread_count "%d".' % | 905 raise CommandException('Invalid parallel_thread_count "%d".' % |
| 818 thread_count) | 906 thread_count) |
| 819 else: | 907 else: |
| 820 # If -m not specified, then assume 1 OS process and 1 Python thread. | 908 # If -m not specified, then assume 1 OS process and 1 Python thread. |
| 821 process_count = 1 | 909 process_count = 1 |
| 822 thread_count = 1 | 910 thread_count = 1 |
| 823 | 911 |
| 824 if IS_WINDOWS and process_count > 1: | 912 if IS_WINDOWS and process_count > 1: |
| 825 raise CommandException('\n'.join(textwrap.wrap(( | 913 raise CommandException('\n'.join(textwrap.wrap( |
| 826 'It is not possible to set process_count > 1 on Windows. Please ' | 914 ('It is not possible to set process_count > 1 on Windows. Please ' |
| 827 'update your config file (located at %s) and set ' | 915 'update your config file (located at %s) and set ' |
| 828 '"process_count = 1".') % | 916 '"parallel_process_count = 1".') % |
| 829 GetConfigFilePath()))) | 917 GetConfigFilePath()))) |
| 830 self.logger.debug('process count: %d', process_count) | 918 self.logger.debug('process count: %d', process_count) |
| 831 self.logger.debug('thread count: %d', thread_count) | 919 self.logger.debug('thread count: %d', thread_count) |
| 832 | 920 |
| 833 return (process_count, thread_count) | 921 return (process_count, thread_count) |
| 834 | 922 |
| 835 def _SetUpPerCallerState(self): | 923 def _SetUpPerCallerState(self): |
| 836 """Set up the state for a caller id, corresponding to one Apply call.""" | 924 """Set up the state for a caller id, corresponding to one Apply call.""" |
| 837 # Get a new caller ID. | 925 # Get a new caller ID. |
| 838 with caller_id_lock: | 926 with caller_id_lock: |
| 839 caller_id_counter.value = caller_id_counter.value + 1 | 927 caller_id_counter.value += 1 |
| 840 caller_id = caller_id_counter.value | 928 caller_id = caller_id_counter.value |
| 841 | 929 |
| 842 # Create a copy of self with an incremented recursive level. This allows | 930 # Create a copy of self with an incremented recursive level. This allows |
| 843 # the class to report its level correctly if the function called from it | 931 # the class to report its level correctly if the function called from it |
| 844 # also needs to call Apply. | 932 # also needs to call Apply. |
| 845 cls = copy.copy(self) | 933 cls = copy.copy(self) |
| 846 cls.recursive_apply_level += 1 | 934 cls.recursive_apply_level += 1 |
| 847 | 935 |
| 848 # Thread-safe loggers can't be pickled, so we will remove it here and | 936 # Thread-safe loggers can't be pickled, so we will remove it here and |
| 849 # recreate it later in the WorkerThread. This is not a problem since any | 937 # recreate it later in the WorkerThread. This is not a problem since any |
| 850 # logger with the same name will be treated as a singleton. | 938 # logger with the same name will be treated as a singleton. |
| 851 cls.logger = None | 939 cls.logger = None |
| 940 |
| 941 # Likewise, the default API connection can't be pickled, but it is unused |
| 942 # anyway as each thread gets its own API delegator. |
| 943 cls.gsutil_api = None |
| 944 |
| 852 class_map[caller_id] = cls | 945 class_map[caller_id] = cls |
| 853 | |
| 854 total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet. | 946 total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet. |
| 855 call_completed_map[caller_id] = False | 947 call_completed_map[caller_id] = False |
| 856 caller_id_finished_count.put(caller_id, 0) | 948 caller_id_finished_count.Put(caller_id, 0) |
| 857 global_return_values_map.put(caller_id, []) | 949 global_return_values_map.Put(caller_id, []) |
| 858 return caller_id | 950 return caller_id |
| 859 | 951 |
| 860 def _CreateNewConsumerPool(self, num_processes, num_threads): | 952 def _CreateNewConsumerPool(self, num_processes, num_threads): |
| 861 """Create a new pool of processes that call _ApplyThreads.""" | 953 """Create a new pool of processes that call _ApplyThreads.""" |
| 862 processes = [] | 954 processes = [] |
| 863 task_queue = _NewMultiprocessingQueue() | 955 task_queue = _NewMultiprocessingQueue() |
| 864 task_queues.append(task_queue) | 956 task_queues.append(task_queue) |
| 865 | 957 |
| 866 current_max_recursive_level.value = current_max_recursive_level.value + 1 | 958 current_max_recursive_level.value += 1 |
| 867 if current_max_recursive_level.value > MAX_RECURSIVE_DEPTH: | 959 if current_max_recursive_level.value > MAX_RECURSIVE_DEPTH: |
| 868 raise CommandException('Recursion depth of Apply calls is too great.') | 960 raise CommandException('Recursion depth of Apply calls is too great.') |
| 869 for shard in range(num_processes): | 961 for _ in range(num_processes): |
| 870 recursive_apply_level = len(consumer_pools) | 962 recursive_apply_level = len(consumer_pools) |
| 871 p = multiprocessing.Process( | 963 p = multiprocessing.Process( |
| 872 target=self._ApplyThreads, | 964 target=self._ApplyThreads, |
| 873 args=(num_threads, recursive_apply_level, shard)) | 965 args=(num_threads, num_processes, recursive_apply_level)) |
| 874 p.daemon = True | 966 p.daemon = True |
| 875 processes.append(p) | 967 processes.append(p) |
| 876 p.start() | 968 p.start() |
| 877 consumer_pool = _ConsumerPool(processes, task_queue) | 969 consumer_pool = _ConsumerPool(processes, task_queue) |
| 878 consumer_pools.append(consumer_pool) | 970 consumer_pools.append(consumer_pool) |
| 879 | 971 |
| 880 def Apply(self, func, args_iterator, exception_handler, | 972 def Apply(self, func, args_iterator, exception_handler, |
| 881 shared_attrs=None, arg_checker=_UriArgChecker, | 973 shared_attrs=None, arg_checker=_UrlArgChecker, |
| 882 parallel_operations_override=False, process_count=None, | 974 parallel_operations_override=False, process_count=None, |
| 883 thread_count=None, should_return_results=False, | 975 thread_count=None, should_return_results=False, |
| 884 fail_on_error=False): | 976 fail_on_error=False): |
| 885 """ | 977 """Calls _Parallel/SequentialApply based on multiprocessing availability. |
| 886 Determines whether the necessary parts of the multiprocessing module are | |
| 887 available, and delegates to _ParallelApply or _SequentialApply as | |
| 888 appropriate. | |
| 889 | 978 |
| 890 Args: | 979 Args: |
| 891 func: Function to call to process each argument. | 980 func: Function to call to process each argument. |
| 892 args_iterator: Iterable collection of arguments to be put into the | 981 args_iterator: Iterable collection of arguments to be put into the |
| 893 work queue. | 982 work queue. |
| 894 exception_handler: Exception handler for WorkerThread class. | 983 exception_handler: Exception handler for WorkerThread class. |
| 895 shared_attrs: List of attributes to manage across sub-processes. | 984 shared_attrs: List of attributes to manage across sub-processes. |
| 896 arg_checker: Used to determine whether we should process the current | 985 arg_checker: Used to determine whether we should process the current |
| 897 argument or simply skip it. Also handles any logging that | 986 argument or simply skip it. Also handles any logging that |
| 898 is specific to a particular type of argument. | 987 is specific to a particular type of argument. |
| 899 parallel_operations_override: Used to override self.parallel_operations. | 988 parallel_operations_override: Used to override self.parallel_operations. |
| 900 This allows the caller to safely override | 989 This allows the caller to safely override |
| 901 the top-level flag for a single call. | 990 the top-level flag for a single call. |
| 902 process_count: The number of processes to use. If not specified, then | 991 process_count: The number of processes to use. If not specified, then |
| 903 the configured default will be used. | 992 the configured default will be used. |
| 904 thread_count: The number of threads per process. If not speficied, then | 993 thread_count: The number of threads per process. If not speficied, then |
| 905 the configured default will be used.. | 994 the configured default will be used.. |
| 906 should_return_results: If true, then return the results of all successful | 995 should_return_results: If true, then return the results of all successful |
| 907 calls to func in a list. | 996 calls to func in a list. |
| 908 fail_on_error: If true, then raise any exceptions encountered when | 997 fail_on_error: If true, then raise any exceptions encountered when |
| 909 executing func. This is only applicable in the case of | 998 executing func. This is only applicable in the case of |
| 910 process_count == thread_count == 1. | 999 process_count == thread_count == 1. |
| 1000 |
| 1001 Returns: |
| 1002 Results from spawned threads. |
| 911 """ | 1003 """ |
| 912 if shared_attrs: | 1004 if shared_attrs: |
| 913 original_shared_vars_values = {} # We'll add these back in at the end. | 1005 original_shared_vars_values = {} # We'll add these back in at the end. |
| 914 for name in shared_attrs: | 1006 for name in shared_attrs: |
| 915 original_shared_vars_values[name] = getattr(self, name) | 1007 original_shared_vars_values[name] = getattr(self, name) |
| 916 # By setting this to 0, we simplify the logic for computing deltas. | 1008 # By setting this to 0, we simplify the logic for computing deltas. |
| 917 # We'll add it back after all of the tasks have been performed. | 1009 # We'll add it back after all of the tasks have been performed. |
| 918 setattr(self, name, 0) | 1010 setattr(self, name, 0) |
| 919 | 1011 |
| 920 (process_count, thread_count) = self._GetProcessAndThreadCount( | 1012 (process_count, thread_count) = self._GetProcessAndThreadCount( |
| 921 process_count, thread_count, parallel_operations_override) | 1013 process_count, thread_count, parallel_operations_override) |
| 1014 |
| 922 is_main_thread = (self.recursive_apply_level == 0 | 1015 is_main_thread = (self.recursive_apply_level == 0 |
| 923 and self.sequential_caller_id == -1) | 1016 and self.sequential_caller_id == -1) |
| 924 | 1017 |
| 925 # We don't honor the fail_on_error flag in the case of multiple threads | 1018 # We don't honor the fail_on_error flag in the case of multiple threads |
| 926 # or processes. | 1019 # or processes. |
| 927 fail_on_error = fail_on_error and (process_count * thread_count == 1) | 1020 fail_on_error = fail_on_error and (process_count * thread_count == 1) |
| 928 | 1021 |
| 929 # Only check this from the first call in the main thread. Apart from the | 1022 # Only check this from the first call in the main thread. Apart from the |
| 930 # fact that it's wasteful to try this multiple times in general, it also | 1023 # fact that it's wasteful to try this multiple times in general, it also |
| 931 # will never work when called from a subprocess since we use daemon | 1024 # will never work when called from a subprocess since we use daemon |
| 932 # processes, and daemons can't create other processes. | 1025 # processes, and daemons can't create other processes. |
| 933 if is_main_thread: | 1026 if is_main_thread: |
| 934 if ((not self.multiprocessing_is_available) | 1027 if ((not self.multiprocessing_is_available) |
| 935 and thread_count * process_count > 1): | 1028 and thread_count * process_count > 1): |
| 936 # Run the check again and log the appropriate warnings. This was run | 1029 # Run the check again and log the appropriate warnings. This was run |
| 937 # before, when the Command object was created, in order to calculate | 1030 # before, when the Command object was created, in order to calculate |
| 938 # self.multiprocessing_is_available, but we don't want to print the | 1031 # self.multiprocessing_is_available, but we don't want to print the |
| 939 # warning until we're sure the user actually tried to use multiple | 1032 # warning until we're sure the user actually tried to use multiple |
| 940 # threads or processes. | 1033 # threads or processes. |
| 941 MultiprocessingIsAvailable(logger=self.logger) | 1034 MultiprocessingIsAvailable(logger=self.logger) |
| 942 | 1035 |
| 943 if self.multiprocessing_is_available: | 1036 if self.multiprocessing_is_available: |
| 944 caller_id = self._SetUpPerCallerState() | 1037 caller_id = self._SetUpPerCallerState() |
| 945 else: | 1038 else: |
| 946 self.sequential_caller_id += 1 | 1039 self.sequential_caller_id += 1 |
| 947 caller_id = self.sequential_caller_id | 1040 caller_id = self.sequential_caller_id |
| 948 | 1041 |
| 949 if is_main_thread: | 1042 if is_main_thread: |
| 950 global global_return_values_map, shared_vars_map | 1043 # pylint: disable=global-variable-undefined |
| 1044 global global_return_values_map, shared_vars_map, failure_count |
| 951 global caller_id_finished_count, shared_vars_list_map | 1045 global caller_id_finished_count, shared_vars_list_map |
| 952 global_return_values_map = BasicIncrementDict() | 1046 global_return_values_map = BasicIncrementDict() |
| 953 global_return_values_map.put(caller_id, []) | 1047 global_return_values_map.Put(caller_id, []) |
| 954 shared_vars_map = BasicIncrementDict() | 1048 shared_vars_map = BasicIncrementDict() |
| 955 caller_id_finished_count = BasicIncrementDict() | 1049 caller_id_finished_count = BasicIncrementDict() |
| 956 shared_vars_list_map = {} | 1050 shared_vars_list_map = {} |
| 957 | 1051 failure_count = 0 |
| 958 | 1052 |
| 959 # If any shared attributes passed by caller, create a dictionary of | 1053 # If any shared attributes passed by caller, create a dictionary of |
| 960 # shared memory variables for every element in the list of shared | 1054 # shared memory variables for every element in the list of shared |
| 961 # attributes. | 1055 # attributes. |
| 962 if shared_attrs: | 1056 if shared_attrs: |
| 963 shared_vars_list_map[caller_id] = shared_attrs | 1057 shared_vars_list_map[caller_id] = shared_attrs |
| 964 for name in shared_attrs: | 1058 for name in shared_attrs: |
| 965 shared_vars_map.put((caller_id, name), 0) | 1059 shared_vars_map.Put((caller_id, name), 0) |
| 966 | 1060 |
| 967 # Make all of the requested function calls. | 1061 # Make all of the requested function calls. |
| 968 if self.multiprocessing_is_available and thread_count * process_count > 1: | 1062 if self.multiprocessing_is_available and thread_count * process_count > 1: |
| 969 self._ParallelApply(func, args_iterator, exception_handler, caller_id, | 1063 self._ParallelApply(func, args_iterator, exception_handler, caller_id, |
| 970 arg_checker, parallel_operations_override, | 1064 arg_checker, process_count, thread_count, |
| 971 process_count, thread_count, should_return_results, | 1065 should_return_results, fail_on_error) |
| 972 fail_on_error) | |
| 973 else: | 1066 else: |
| 974 self._SequentialApply(func, args_iterator, exception_handler, caller_id, | 1067 self._SequentialApply(func, args_iterator, exception_handler, caller_id, |
| 975 arg_checker, should_return_results, fail_on_error) | 1068 arg_checker, should_return_results, fail_on_error) |
| 976 | 1069 |
| 977 if shared_attrs: | 1070 if shared_attrs: |
| 978 for name in shared_attrs: | 1071 for name in shared_attrs: |
| 979 # This allows us to retain the original value of the shared variable, | 1072 # This allows us to retain the original value of the shared variable, |
| 980 # and simply apply the delta after what was done during the call to | 1073 # and simply apply the delta after what was done during the call to |
| 981 # apply. | 1074 # apply. |
| 982 final_value = (original_shared_vars_values[name] + | 1075 final_value = (original_shared_vars_values[name] + |
| 983 shared_vars_map.get((caller_id, name))) | 1076 shared_vars_map.Get((caller_id, name))) |
| 984 setattr(self, name, final_value) | 1077 setattr(self, name, final_value) |
| 985 | 1078 |
| 986 if should_return_results: | 1079 if should_return_results: |
| 987 return global_return_values_map.get(caller_id) | 1080 return global_return_values_map.Get(caller_id) |
| 988 | 1081 |
| 1082 def _MaybeSuggestGsutilDashM(self): |
| 1083 """Outputs a sugestion to the user to use gsutil -m.""" |
| 1084 if not (boto.config.getint('GSUtil', 'parallel_process_count', 0) == 1 and |
| 1085 boto.config.getint('GSUtil', 'parallel_thread_count', 0) == 1): |
| 1086 self.logger.warn('\n' + textwrap.fill( |
| 1087 '==> NOTE: You are performing a sequence of gsutil operations that ' |
| 1088 'may run significantly faster if you instead use gsutil -m %s ...\n' |
| 1089 'Please see the -m section under "gsutil help options" for further ' |
| 1090 'information about when gsutil -m can be advantageous.' |
| 1091 % sys.argv[1]) + '\n') |
| 1092 |
| 1093 # pylint: disable=g-doc-args |
| 989 def _SequentialApply(self, func, args_iterator, exception_handler, caller_id, | 1094 def _SequentialApply(self, func, args_iterator, exception_handler, caller_id, |
| 990 arg_checker, should_return_results, fail_on_error): | 1095 arg_checker, should_return_results, fail_on_error): |
| 1096 """Performs all function calls sequentially in the current thread. |
| 1097 |
| 1098 No other threads or processes will be spawned. This degraded functionality |
| 1099 is used when the multiprocessing module is not available or the user |
| 1100 requests only one thread and one process. |
| 991 """ | 1101 """ |
| 992 Perform all function calls sequentially in the current thread. No other | |
| 993 threads or processes will be spawned. This degraded functionality is only | |
| 994 for use when the multiprocessing module is not available for some reason. | |
| 995 """ | |
| 996 | |
| 997 # Create a WorkerThread to handle all of the logic needed to actually call | 1102 # Create a WorkerThread to handle all of the logic needed to actually call |
| 998 # the function. Note that this thread will never be started, and all work | 1103 # the function. Note that this thread will never be started, and all work |
| 999 # is done in the current thread. | 1104 # is done in the current thread. |
| 1000 worker_thread = WorkerThread(None, False) | 1105 worker_thread = WorkerThread(None, False) |
| 1001 args_iterator = iter(args_iterator) | 1106 args_iterator = iter(args_iterator) |
| 1107 # Count of sequential calls that have been made. Used for producing |
| 1108 # suggestion to use gsutil -m. |
| 1109 sequential_call_count = 0 |
| 1002 while True: | 1110 while True: |
| 1003 | 1111 |
| 1004 # Try to get the next argument, handling any exceptions that arise. | 1112 # Try to get the next argument, handling any exceptions that arise. |
| 1005 try: | 1113 try: |
| 1006 args = args_iterator.next() | 1114 args = args_iterator.next() |
| 1007 except StopIteration, e: | 1115 except StopIteration, e: |
| 1008 break | 1116 break |
| 1009 except Exception, e: | 1117 except Exception, e: # pylint: disable=broad-except |
| 1118 _IncrementFailureCount() |
| 1010 if fail_on_error: | 1119 if fail_on_error: |
| 1011 raise | 1120 raise |
| 1012 else: | 1121 else: |
| 1013 try: | 1122 try: |
| 1014 exception_handler(self, e) | 1123 exception_handler(self, e) |
| 1015 except Exception, e1: | 1124 except Exception, _: # pylint: disable=broad-except |
| 1016 self.logger.debug( | 1125 self.logger.debug( |
| 1017 'Caught exception while handling exception for %s:\n%s', | 1126 'Caught exception while handling exception for %s:\n%s', |
| 1018 func, traceback.format_exc()) | 1127 func, traceback.format_exc()) |
| 1019 continue | 1128 continue |
| 1129 |
| 1130 sequential_call_count += 1 |
| 1131 if sequential_call_count == OFFER_GSUTIL_M_SUGGESTION_THRESHOLD: |
| 1132 # Output suggestion near beginning of run, so user sees it early and can |
| 1133 # ^C and try gsutil -m. |
| 1134 self._MaybeSuggestGsutilDashM() |
| 1020 if arg_checker(self, args): | 1135 if arg_checker(self, args): |
| 1021 # Now that we actually have the next argument, perform the task. | 1136 # Now that we actually have the next argument, perform the task. |
| 1022 task = Task(func, args, caller_id, exception_handler, | 1137 task = Task(func, args, caller_id, exception_handler, |
| 1023 should_return_results, arg_checker, fail_on_error) | 1138 should_return_results, arg_checker, fail_on_error) |
| 1024 worker_thread.PerformTask(task, self) | 1139 worker_thread.PerformTask(task, self) |
| 1140 if sequential_call_count >= gslib.util.GetTermLines(): |
| 1141 # Output suggestion at end of long run, in case user missed it at the |
| 1142 # start and it scrolled off-screen. |
| 1143 self._MaybeSuggestGsutilDashM() |
| 1025 | 1144 |
| 1145 # pylint: disable=g-doc-args |
| 1026 def _ParallelApply(self, func, args_iterator, exception_handler, caller_id, | 1146 def _ParallelApply(self, func, args_iterator, exception_handler, caller_id, |
| 1027 arg_checker, parallel_operations_override, process_count, | 1147 arg_checker, process_count, thread_count, |
| 1028 thread_count, should_return_results, fail_on_error): | 1148 should_return_results, fail_on_error): |
| 1029 """ | 1149 """Dispatches input arguments across a thread/process pool. |
| 1030 Dispatch input arguments across a pool of parallel OS | 1150 |
| 1031 processes and/or Python threads, based on options (-m or not) | 1151 Pools are composed of parallel OS processes and/or Python threads, |
| 1032 and settings in the user's config file. If non-parallel mode | 1152 based on options (-m or not) and settings in the user's config file. |
| 1033 or only one OS process requested, execute requests sequentially | 1153 |
| 1034 in the current OS process. | 1154 If only one OS process is requested/available, dispatch requests across |
| 1035 | 1155 threads in the current OS process. |
| 1156 |
| 1036 In the multi-process case, we will create one pool of worker processes for | 1157 In the multi-process case, we will create one pool of worker processes for |
| 1037 each level of the tree of recursive calls to Apply. E.g., if A calls | 1158 each level of the tree of recursive calls to Apply. E.g., if A calls |
| 1038 Apply(B), and B ultimately calls Apply(C) followed by Apply(D), then we | 1159 Apply(B), and B ultimately calls Apply(C) followed by Apply(D), then we |
| 1039 will only create two sets of worker processes - B will execute in the first, | 1160 will only create two sets of worker processes - B will execute in the first, |
| 1040 and C and D will execute in the second. If C is then changed to call | 1161 and C and D will execute in the second. If C is then changed to call |
| 1041 Apply(E) and D is changed to call Apply(F), then we will automatically | 1162 Apply(E) and D is changed to call Apply(F), then we will automatically |
| 1042 create a third set of processes (lazily, when needed) that will be used to | 1163 create a third set of processes (lazily, when needed) that will be used to |
| 1043 execute calls to E and F. This might look something like: | 1164 execute calls to E and F. This might look something like: |
| 1044 | 1165 |
| 1045 Pool1 Executes: B | 1166 Pool1 Executes: B |
| 1046 / \ | 1167 / \ |
| 1047 Pool2 Executes: C D | 1168 Pool2 Executes: C D |
| 1048 / \ | 1169 / \ |
| 1049 Pool3 Executes: E F | 1170 Pool3 Executes: E F |
| 1050 | 1171 |
| 1051 Apply's parallelism is generally broken up into 4 cases: | 1172 Apply's parallelism is generally broken up into 4 cases: |
| 1052 - If process_count == thread_count == 1, then all tasks will be executed | 1173 - If process_count == thread_count == 1, then all tasks will be executed |
| 1053 in this thread. | 1174 by _SequentialApply. |
| 1054 - If process_count > 1 and thread_count == 1, then the main thread will | 1175 - If process_count > 1 and thread_count == 1, then the main thread will |
| 1055 create a new pool of processes (if they don't already exist) and each of | 1176 create a new pool of processes (if they don't already exist) and each of |
| 1056 those processes will execute the tasks in a single thread. | 1177 those processes will execute the tasks in a single thread. |
| 1057 - If process_count == 1 and thread_count > 1, then this process will create | 1178 - If process_count == 1 and thread_count > 1, then this process will create |
| 1058 a new pool of threads to execute the tasks. | 1179 a new pool of threads to execute the tasks. |
| 1059 - If process_count > 1 and thread_count > 1, then the main thread will | 1180 - If process_count > 1 and thread_count > 1, then the main thread will |
| 1060 create a new pool of processes (if they don't already exist) and each of | 1181 create a new pool of processes (if they don't already exist) and each of |
| 1061 those processes will, upon creation, create a pool of threads to | 1182 those processes will, upon creation, create a pool of threads to |
| 1062 execute the tasks. | 1183 execute the tasks. |
| 1063 | 1184 |
| 1064 Args: | 1185 Args: |
| 1065 caller_id: The caller ID unique to this call to command.Apply. | 1186 caller_id: The caller ID unique to this call to command.Apply. |
| 1066 See command.Apply for description of other arguments. | 1187 See command.Apply for description of other arguments. |
| 1067 """ | 1188 """ |
| 1068 is_main_thread = self.recursive_apply_level == 0 | 1189 is_main_thread = self.recursive_apply_level == 0 |
| 1069 | 1190 |
| 1070 # Catch ^C under Linux/MacOs so we can do cleanup before exiting. | 1191 # Catch ^C under Linux/MacOs so we can do cleanup before exiting. |
| 1071 if not IS_WINDOWS and is_main_thread: | 1192 if not IS_WINDOWS and is_main_thread: |
| 1072 signal.signal(signal.SIGINT, self._HandleMultiProcessingControlC) | 1193 signal.signal(signal.SIGINT, self._HandleMultiProcessingControlC) |
| 1073 | 1194 |
| 1074 if not len(task_queues): | 1195 if not task_queues: |
| 1075 # The process we create will need to access the next recursive level | 1196 # The process we create will need to access the next recursive level |
| 1076 # of task queues if it makes a call to Apply, so we always keep around | 1197 # of task queues if it makes a call to Apply, so we always keep around |
| 1077 # one more queue than we know we need. OTOH, if we don't create a new | 1198 # one more queue than we know we need. OTOH, if we don't create a new |
| 1078 # process, the existing process still needs a task queue to use. | 1199 # process, the existing process still needs a task queue to use. |
| 1079 task_queues.append(_NewMultiprocessingQueue()) | 1200 task_queues.append(_NewMultiprocessingQueue()) |
| 1080 | 1201 |
| 1081 if process_count > 1: # Handle process pool creation. | 1202 if process_count > 1: # Handle process pool creation. |
| 1082 # Check whether this call will need a new set of workers. | 1203 # Check whether this call will need a new set of workers. |
| 1083 with need_pool_or_done_cond: | 1204 |
| 1205 # Each worker must acquire a shared lock before notifying the main thread |
| 1206 # that it needs a new worker pool, so that at most one worker asks for |
| 1207 # a new worker pool at once. |
| 1208 try: |
| 1209 if not is_main_thread: |
| 1210 worker_checking_level_lock.acquire() |
| 1084 if self.recursive_apply_level >= current_max_recursive_level.value: | 1211 if self.recursive_apply_level >= current_max_recursive_level.value: |
| 1085 # Only the main thread is allowed to create new processes - otherwise, | 1212 with need_pool_or_done_cond: |
| 1086 # we will run into some Python bugs. | 1213 # Only the main thread is allowed to create new processes - |
| 1087 if is_main_thread: | 1214 # otherwise, we will run into some Python bugs. |
| 1088 self._CreateNewConsumerPool(process_count, thread_count) | 1215 if is_main_thread: |
| 1089 else: | 1216 self._CreateNewConsumerPool(process_count, thread_count) |
| 1090 # Notify the main thread that we need a new consumer pool. | 1217 else: |
| 1091 new_pool_needed.value = 1 | 1218 # Notify the main thread that we need a new consumer pool. |
| 1092 need_pool_or_done_cond.notify_all() | 1219 new_pool_needed.value = 1 |
| 1093 # The main thread will notify us when it finishes. | 1220 need_pool_or_done_cond.notify_all() |
| 1094 need_pool_or_done_cond.wait() | 1221 # The main thread will notify us when it finishes. |
| 1222 need_pool_or_done_cond.wait() |
| 1223 finally: |
| 1224 if not is_main_thread: |
| 1225 worker_checking_level_lock.release() |
| 1095 | 1226 |
| 1096 # If we're running in this process, create a separate task queue. Otherwise, | 1227 # If we're running in this process, create a separate task queue. Otherwise, |
| 1097 # if Apply has already been called with process_count > 1, then there will | 1228 # if Apply has already been called with process_count > 1, then there will |
| 1098 # be consumer pools trying to use our processes. | 1229 # be consumer pools trying to use our processes. |
| 1099 if process_count > 1: | 1230 if process_count > 1: |
| 1100 task_queue = task_queues[self.recursive_apply_level] | 1231 task_queue = task_queues[self.recursive_apply_level] |
| 1101 else: | 1232 else: |
| 1102 task_queue = _NewMultiprocessingQueue() | 1233 task_queue = _NewMultiprocessingQueue() |
| 1103 | 1234 |
| 1104 # Kick off a producer thread to throw tasks in the global task queue. We | 1235 # Kick off a producer thread to throw tasks in the global task queue. We |
| (...skipping 20 matching lines...) Expand all Loading... |
| 1125 # pools, or we the wakeup call was meant for someone else. It's | 1256 # pools, or we the wakeup call was meant for someone else. It's |
| 1126 # impossible for both conditions to be true, since the main thread is | 1257 # impossible for both conditions to be true, since the main thread is |
| 1127 # blocked on any other ongoing calls to Apply, and a thread would not | 1258 # blocked on any other ongoing calls to Apply, and a thread would not |
| 1128 # ask for a new consumer pool unless it had more work to do. | 1259 # ask for a new consumer pool unless it had more work to do. |
| 1129 if call_completed_map[caller_id]: | 1260 if call_completed_map[caller_id]: |
| 1130 break | 1261 break |
| 1131 elif is_main_thread and new_pool_needed.value: | 1262 elif is_main_thread and new_pool_needed.value: |
| 1132 new_pool_needed.value = 0 | 1263 new_pool_needed.value = 0 |
| 1133 self._CreateNewConsumerPool(process_count, thread_count) | 1264 self._CreateNewConsumerPool(process_count, thread_count) |
| 1134 need_pool_or_done_cond.notify_all() | 1265 need_pool_or_done_cond.notify_all() |
| 1135 | 1266 |
| 1136 # Note that we must check the above conditions before the wait() call; | 1267 # Note that we must check the above conditions before the wait() call; |
| 1137 # otherwise, the notification can happen before we start waiting, in | 1268 # otherwise, the notification can happen before we start waiting, in |
| 1138 # which case we'll block forever. | 1269 # which case we'll block forever. |
| 1139 need_pool_or_done_cond.wait() | 1270 need_pool_or_done_cond.wait() |
| 1140 else: # Using a single process. | 1271 else: # Using a single process. |
| 1141 shard = 0 | 1272 self._ApplyThreads(thread_count, process_count, |
| 1142 self._ApplyThreads(thread_count, self.recursive_apply_level, shard, | 1273 self.recursive_apply_level, |
| 1143 is_blocking_call=True, task_queue=task_queue) | 1274 is_blocking_call=True, task_queue=task_queue) |
| 1144 | 1275 |
| 1145 # We encountered an exception from the producer thread before any arguments | 1276 # We encountered an exception from the producer thread before any arguments |
| 1146 # were enqueued, but it wouldn't have been propagated, so we'll now | 1277 # were enqueued, but it wouldn't have been propagated, so we'll now |
| 1147 # explicitly raise it here. | 1278 # explicitly raise it here. |
| 1148 if producer_thread.unknown_exception: | 1279 if producer_thread.unknown_exception: |
| 1280 # pylint: disable=raising-bad-type |
| 1149 raise producer_thread.unknown_exception | 1281 raise producer_thread.unknown_exception |
| 1150 | 1282 |
| 1151 # We encountered an exception from the producer thread while iterating over | 1283 # We encountered an exception from the producer thread while iterating over |
| 1152 # the arguments, so raise it here if we're meant to fail on error. | 1284 # the arguments, so raise it here if we're meant to fail on error. |
| 1153 if producer_thread.iterator_exception and fail_on_error: | 1285 if producer_thread.iterator_exception and fail_on_error: |
| 1286 # pylint: disable=raising-bad-type |
| 1154 raise producer_thread.iterator_exception | 1287 raise producer_thread.iterator_exception |
| 1155 | 1288 |
| 1156 def _ApplyThreads(self, thread_count, recursive_apply_level, shard, | 1289 def _ApplyThreads(self, thread_count, process_count, recursive_apply_level, |
| 1157 is_blocking_call=False, task_queue=None): | 1290 is_blocking_call=False, task_queue=None): |
| 1158 """ | 1291 """Assigns the work from the multi-process global task queue. |
| 1159 Assigns the work from the global task queue shared among all processes | 1292 |
| 1160 to an individual process, for later consumption either by the WorkerThreads | 1293 Work is assigned to an individual process for later consumption either by |
| 1161 or in this thread if thread_count == 1. | 1294 the WorkerThreads or (if thread_count == 1) this thread. |
| 1162 | 1295 |
| 1163 Args: | 1296 Args: |
| 1164 thread_count: The number of threads used to perform the work. If 1, then | 1297 thread_count: The number of threads used to perform the work. If 1, then |
| 1165 perform all work in this thread. | 1298 perform all work in this thread. |
| 1299 process_count: The number of processes used to perform the work. |
| 1166 recursive_apply_level: The depth in the tree of recursive calls to Apply | 1300 recursive_apply_level: The depth in the tree of recursive calls to Apply |
| 1167 of this thread. | 1301 of this thread. |
| 1168 shard: Assigned subset (shard number) for this function. | |
| 1169 is_blocking_call: True iff the call to Apply is blocked on this call | 1302 is_blocking_call: True iff the call to Apply is blocked on this call |
| 1170 (which is true iff process_count == 1), implying that | 1303 (which is true iff process_count == 1), implying that |
| 1171 _ApplyThreads must behave as a blocking call. | 1304 _ApplyThreads must behave as a blocking call. |
| 1172 """ | 1305 """ |
| 1173 self._ResetConnectionPool() | 1306 self._ResetConnectionPool() |
| 1174 | 1307 self.recursive_apply_level = recursive_apply_level |
| 1308 |
| 1175 task_queue = task_queue or task_queues[recursive_apply_level] | 1309 task_queue = task_queue or task_queues[recursive_apply_level] |
| 1176 | 1310 |
| 1311 assert thread_count * process_count > 1, ( |
| 1312 'Invalid state, calling command._ApplyThreads with only one thread ' |
| 1313 'and process.') |
| 1177 if thread_count > 1: | 1314 if thread_count > 1: |
| 1178 worker_pool = WorkerPool(thread_count) | 1315 worker_pool = WorkerPool( |
| 1179 else: | 1316 thread_count, self.logger, |
| 1180 worker_pool = SameThreadWorkerPool(self) | 1317 bucket_storage_uri_class=self.bucket_storage_uri_class, |
| 1318 gsutil_api_map=self.gsutil_api_map, debug=self.debug) |
| 1319 elif process_count > 1: |
| 1320 worker_pool = SameThreadWorkerPool( |
| 1321 self, bucket_storage_uri_class=self.bucket_storage_uri_class, |
| 1322 gsutil_api_map=self.gsutil_api_map, debug=self.debug) |
| 1181 | 1323 |
| 1182 num_enqueued = 0 | 1324 num_enqueued = 0 |
| 1183 while True: | 1325 while True: |
| 1184 task = task_queue.get() | 1326 task = task_queue.get() |
| 1185 if task.args != ZERO_TASKS_TO_DO_ARGUMENT: | 1327 if task.args != ZERO_TASKS_TO_DO_ARGUMENT: |
| 1186 # If we have no tasks to do and we're performing a blocking call, we | 1328 # If we have no tasks to do and we're performing a blocking call, we |
| 1187 # need a special signal to tell us to stop - otherwise, we block on | 1329 # need a special signal to tell us to stop - otherwise, we block on |
| 1188 # the call to task_queue.get() forever. | 1330 # the call to task_queue.get() forever. |
| 1189 worker_pool.AddTask(task) | 1331 worker_pool.AddTask(task) |
| 1190 num_enqueued += 1 | 1332 num_enqueued += 1 |
| (...skipping 14 matching lines...) Expand all Loading... |
| 1205 # notified before we grabbed the lock. | 1347 # notified before we grabbed the lock. |
| 1206 return | 1348 return |
| 1207 need_pool_or_done_cond.wait() | 1349 need_pool_or_done_cond.wait() |
| 1208 | 1350 |
| 1209 | 1351 |
| 1210 # Below here lie classes and functions related to controlling the flow of tasks | 1352 # Below here lie classes and functions related to controlling the flow of tasks |
| 1211 # between various threads and processes. | 1353 # between various threads and processes. |
| 1212 | 1354 |
| 1213 | 1355 |
| 1214 class _ConsumerPool(object): | 1356 class _ConsumerPool(object): |
| 1357 |
| 1215 def __init__(self, processes, task_queue): | 1358 def __init__(self, processes, task_queue): |
| 1216 self.processes = processes | 1359 self.processes = processes |
| 1217 self.task_queue = task_queue | 1360 self.task_queue = task_queue |
| 1218 | 1361 |
| 1219 def ShutDown(self): | 1362 def ShutDown(self): |
| 1220 for process in self.processes: | 1363 for process in self.processes: |
| 1221 KillProcess(process.pid) | 1364 KillProcess(process.pid) |
| 1222 | 1365 |
| 1223 | 1366 |
| 1224 def KillProcess(pid): | 1367 def KillProcess(pid): |
| 1225 # os.kill doesn't work in 2.X or 3.Y on Windows for any X < 7 or Y < 2. | 1368 # os.kill doesn't work in 2.X or 3.Y on Windows for any X < 7 or Y < 2. |
| 1226 if IS_WINDOWS and ((2, 6) <= sys.version_info[:3] < (2, 7) or | 1369 if IS_WINDOWS and ((2, 6) <= sys.version_info[:3] < (2, 7) or |
| 1227 (3, 0) <= sys.version_info[:3] < (3, 2)): | 1370 (3, 0) <= sys.version_info[:3] < (3, 2)): |
| 1228 try: | 1371 try: |
| 1229 kernel32 = ctypes.windll.kernel32 | 1372 kernel32 = ctypes.windll.kernel32 |
| 1230 handle = kernel32.OpenProcess(1, 0, pid) | 1373 handle = kernel32.OpenProcess(1, 0, pid) |
| 1231 kernel32.TerminateProcess(handle, 0) | 1374 kernel32.TerminateProcess(handle, 0) |
| 1232 except: | 1375 except: # pylint: disable=bare-except |
| 1233 pass | 1376 pass |
| 1234 else: | 1377 else: |
| 1235 try: | 1378 try: |
| 1236 os.kill(pid, signal.SIGKILL) | 1379 os.kill(pid, signal.SIGKILL) |
| 1237 except OSError: | 1380 except OSError: |
| 1238 pass | 1381 pass |
| 1239 | 1382 |
| 1240 | 1383 |
| 1241 class Task(namedtuple('Task', | 1384 class Task(namedtuple('Task', ( |
| 1242 'func args caller_id exception_handler should_return_results arg_checker ' + | 1385 'func args caller_id exception_handler should_return_results arg_checker ' |
| 1243 'fail_on_error')): | 1386 'fail_on_error'))): |
| 1244 """ | 1387 """Task class representing work to be completed. |
| 1388 |
| 1245 Args: | 1389 Args: |
| 1246 func: The function to be executed. | 1390 func: The function to be executed. |
| 1247 args: The arguments to func. | 1391 args: The arguments to func. |
| 1248 caller_id: The globally-unique caller ID corresponding to the Apply call. | 1392 caller_id: The globally-unique caller ID corresponding to the Apply call. |
| 1249 exception_handler: The exception handler to use if the call to func fails. | 1393 exception_handler: The exception handler to use if the call to func fails. |
| 1250 should_return_results: True iff the results of this function should be | 1394 should_return_results: True iff the results of this function should be |
| 1251 returned from the Apply call. | 1395 returned from the Apply call. |
| 1252 arg_checker: Used to determine whether we should process the current | 1396 arg_checker: Used to determine whether we should process the current |
| 1253 argument or simply skip it. Also handles any logging that | 1397 argument or simply skip it. Also handles any logging that |
| 1254 is specific to a particular type of argument. | 1398 is specific to a particular type of argument. |
| 1255 fail_on_error: If true, then raise any exceptions encountered when | 1399 fail_on_error: If true, then raise any exceptions encountered when |
| 1256 executing func. This is only applicable in the case of | 1400 executing func. This is only applicable in the case of |
| 1257 process_count == thread_count == 1. | 1401 process_count == thread_count == 1. |
| 1258 """ | 1402 """ |
| 1259 pass | 1403 pass |
| 1260 | 1404 |
| 1261 | 1405 |
| 1262 class ProducerThread(threading.Thread): | 1406 class ProducerThread(threading.Thread): |
| 1263 """Thread used to enqueue work for other processes and threads.""" | 1407 """Thread used to enqueue work for other processes and threads.""" |
| 1408 |
| 1264 def __init__(self, cls, args_iterator, caller_id, func, task_queue, | 1409 def __init__(self, cls, args_iterator, caller_id, func, task_queue, |
| 1265 should_return_results, exception_handler, arg_checker, | 1410 should_return_results, exception_handler, arg_checker, |
| 1266 fail_on_error): | 1411 fail_on_error): |
| 1267 """ | 1412 """Initializes the producer thread. |
| 1413 |
| 1268 Args: | 1414 Args: |
| 1269 cls: Instance of Command for which this ProducerThread was created. | 1415 cls: Instance of Command for which this ProducerThread was created. |
| 1270 args_iterator: Iterable collection of arguments to be put into the | 1416 args_iterator: Iterable collection of arguments to be put into the |
| 1271 work queue. | 1417 work queue. |
| 1272 caller_id: Globally-unique caller ID corresponding to this call to Apply. | 1418 caller_id: Globally-unique caller ID corresponding to this call to Apply. |
| 1273 func: The function to be called on each element of args_iterator. | 1419 func: The function to be called on each element of args_iterator. |
| 1274 task_queue: The queue into which tasks will be put, to later be consumed | 1420 task_queue: The queue into which tasks will be put, to later be consumed |
| 1275 by Command._ApplyThreads. | 1421 by Command._ApplyThreads. |
| 1276 should_return_results: True iff the results for this call to command.Apply | 1422 should_return_results: True iff the results for this call to command.Apply |
| 1277 were requested. | 1423 were requested. |
| (...skipping 26 matching lines...) Expand all Loading... |
| 1304 num_tasks = 0 | 1450 num_tasks = 0 |
| 1305 cur_task = None | 1451 cur_task = None |
| 1306 last_task = None | 1452 last_task = None |
| 1307 try: | 1453 try: |
| 1308 args_iterator = iter(self.args_iterator) | 1454 args_iterator = iter(self.args_iterator) |
| 1309 while True: | 1455 while True: |
| 1310 try: | 1456 try: |
| 1311 args = args_iterator.next() | 1457 args = args_iterator.next() |
| 1312 except StopIteration, e: | 1458 except StopIteration, e: |
| 1313 break | 1459 break |
| 1314 except Exception, e: | 1460 except Exception, e: # pylint: disable=broad-except |
| 1461 _IncrementFailureCount() |
| 1315 if self.fail_on_error: | 1462 if self.fail_on_error: |
| 1316 self.iterator_exception = e | 1463 self.iterator_exception = e |
| 1317 raise | 1464 raise |
| 1318 else: | 1465 else: |
| 1319 try: | 1466 try: |
| 1320 self.exception_handler(self.cls, e) | 1467 self.exception_handler(self.cls, e) |
| 1321 except Exception, e1: | 1468 except Exception, _: # pylint: disable=broad-except |
| 1322 self.cls.logger.debug( | 1469 self.cls.logger.debug( |
| 1323 'Caught exception while handling exception for %s:\n%s', | 1470 'Caught exception while handling exception for %s:\n%s', |
| 1324 self.func, traceback.format_exc()) | 1471 self.func, traceback.format_exc()) |
| 1325 self.shared_variables_updater.Update(self.caller_id, self.cls) | 1472 self.shared_variables_updater.Update(self.caller_id, self.cls) |
| 1326 continue | 1473 continue |
| 1327 | 1474 |
| 1328 if self.arg_checker(self.cls, args): | 1475 if self.arg_checker(self.cls, args): |
| 1329 num_tasks += 1 | 1476 num_tasks += 1 |
| 1330 last_task = cur_task | 1477 last_task = cur_task |
| 1331 cur_task = Task(self.func, args, self.caller_id, | 1478 cur_task = Task(self.func, args, self.caller_id, |
| 1332 self.exception_handler, self.should_return_results, | 1479 self.exception_handler, self.should_return_results, |
| 1333 self.arg_checker, self.fail_on_error) | 1480 self.arg_checker, self.fail_on_error) |
| 1334 if last_task: | 1481 if last_task: |
| 1335 self.task_queue.put(last_task, self.caller_id) | 1482 self.task_queue.put(last_task) |
| 1336 except Exception, e: | 1483 except Exception, e: # pylint: disable=broad-except |
| 1337 # This will also catch any exception raised due to an error in the | 1484 # This will also catch any exception raised due to an error in the |
| 1338 # iterator when fail_on_error is set, so check that we failed for some | 1485 # iterator when fail_on_error is set, so check that we failed for some |
| 1339 # other reason before claiming that we had an unknown exception. | 1486 # other reason before claiming that we had an unknown exception. |
| 1340 if not self.iterator_exception: | 1487 if not self.iterator_exception: |
| 1341 self.unknown_exception = e | 1488 self.unknown_exception = e |
| 1342 finally: | 1489 finally: |
| 1343 # We need to make sure to update total_tasks[caller_id] before we enqueue | 1490 # We need to make sure to update total_tasks[caller_id] before we enqueue |
| 1344 # the last task. Otherwise, a worker can retrieve the last task and | 1491 # the last task. Otherwise, a worker can retrieve the last task and |
| 1345 # complete it, then check total_tasks and determine that we're not done | 1492 # complete it, then check total_tasks and determine that we're not done |
| 1346 # producing all before we update total_tasks. This approach forces workers | 1493 # producing all before we update total_tasks. This approach forces workers |
| 1347 # to wait on the last task until after we've updated total_tasks. | 1494 # to wait on the last task until after we've updated total_tasks. |
| 1348 total_tasks[self.caller_id] = num_tasks | 1495 total_tasks[self.caller_id] = num_tasks |
| 1349 if not cur_task: | 1496 if not cur_task: |
| 1350 # This happens if there were zero arguments to be put in the queue. | 1497 # This happens if there were zero arguments to be put in the queue. |
| 1351 cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id, | 1498 cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id, |
| 1352 None, None, None, None) | 1499 None, None, None, None) |
| 1353 self.task_queue.put(cur_task, self.caller_id) | 1500 self.task_queue.put(cur_task) |
| 1354 | 1501 |
| 1355 # It's possible that the workers finished before we updated total_tasks, | 1502 # It's possible that the workers finished before we updated total_tasks, |
| 1356 # so we need to check here as well. | 1503 # so we need to check here as well. |
| 1357 _NotifyIfDone(self.caller_id, | 1504 _NotifyIfDone(self.caller_id, |
| 1358 caller_id_finished_count.get(self.caller_id)) | 1505 caller_id_finished_count.Get(self.caller_id)) |
| 1359 | 1506 |
| 1360 | 1507 |
| 1361 class SameThreadWorkerPool(object): | 1508 class SameThreadWorkerPool(object): |
| 1362 """Behaves like a WorkerPool, but used for the single-threaded case.""" | 1509 """Behaves like a WorkerPool, but used for the single-threaded case.""" |
| 1363 def __init__(self, cls): | 1510 |
| 1511 def __init__(self, cls, bucket_storage_uri_class=None, |
| 1512 gsutil_api_map=None, debug=0): |
| 1364 self.cls = cls | 1513 self.cls = cls |
| 1365 self.worker_thread = WorkerThread(None) | 1514 self.worker_thread = WorkerThread( |
| 1366 | 1515 None, cls.logger, |
| 1516 bucket_storage_uri_class=bucket_storage_uri_class, |
| 1517 gsutil_api_map=gsutil_api_map, debug=debug) |
| 1518 |
| 1367 def AddTask(self, task): | 1519 def AddTask(self, task): |
| 1368 self.worker_thread.PerformTask(task, self.cls) | 1520 self.worker_thread.PerformTask(task, self.cls) |
| 1369 | 1521 |
| 1370 | 1522 |
| 1371 class WorkerPool(object): | 1523 class WorkerPool(object): |
| 1372 """Pool of worker threads to which tasks can be added.""" | 1524 """Pool of worker threads to which tasks can be added.""" |
| 1373 def __init__(self, thread_count): | 1525 |
| 1526 def __init__(self, thread_count, logger, bucket_storage_uri_class=None, |
| 1527 gsutil_api_map=None, debug=0): |
| 1374 self.task_queue = _NewThreadsafeQueue() | 1528 self.task_queue = _NewThreadsafeQueue() |
| 1375 self.threads = [] | 1529 self.threads = [] |
| 1376 for _ in range(thread_count): | 1530 for _ in range(thread_count): |
| 1377 worker_thread = WorkerThread(self.task_queue) | 1531 worker_thread = WorkerThread( |
| 1532 self.task_queue, logger, |
| 1533 bucket_storage_uri_class=bucket_storage_uri_class, |
| 1534 gsutil_api_map=gsutil_api_map, debug=debug) |
| 1378 self.threads.append(worker_thread) | 1535 self.threads.append(worker_thread) |
| 1379 worker_thread.start() | 1536 worker_thread.start() |
| 1380 | 1537 |
| 1381 def AddTask(self, task): | 1538 def AddTask(self, task): |
| 1382 self.task_queue.put(task) | 1539 self.task_queue.put(task) |
| 1383 | 1540 |
| 1384 | 1541 |
| 1385 class WorkerThread(threading.Thread): | 1542 class WorkerThread(threading.Thread): |
| 1386 """ | 1543 """Thread where all the work will be performed. |
| 1387 This thread is where all of the work will be performed in actually making the | 1544 |
| 1388 function calls for Apply. It takes care of all error handling, return value | 1545 This makes the function calls for Apply and takes care of all error handling, |
| 1389 propagation, and shared_vars. | 1546 return value propagation, and shared_vars. |
| 1390 | 1547 |
| 1391 Note that this thread is NOT started upon instantiation because the function- | 1548 Note that this thread is NOT started upon instantiation because the function- |
| 1392 calling logic is also used in the single-threaded case. | 1549 calling logic is also used in the single-threaded case. |
| 1393 """ | 1550 """ |
| 1394 def __init__(self, task_queue, multiprocessing_is_available=True): | 1551 |
| 1395 """ | 1552 def __init__(self, task_queue, logger, bucket_storage_uri_class=None, |
| 1553 gsutil_api_map=None, debug=0): |
| 1554 """Initializes the worker thread. |
| 1555 |
| 1396 Args: | 1556 Args: |
| 1397 task_queue: The thread-safe queue from which this thread should obtain | 1557 task_queue: The thread-safe queue from which this thread should obtain |
| 1398 its work. | 1558 its work. |
| 1399 multiprocessing_is_available: False iff the multiprocessing module is not | 1559 logger: Logger to use for this thread. |
| 1400 available, in which case we're using | 1560 bucket_storage_uri_class: Class to instantiate for cloud StorageUris. |
| 1401 _SequentialApply. | 1561 Settable for testing/mocking. |
| 1562 gsutil_api_map: Map of providers and API selector tuples to api classes |
| 1563 which can be used to communicate with those providers. |
| 1564 Used for the instantiating CloudApiDelegator class. |
| 1565 debug: debug level for the CloudApiDelegator class. |
| 1402 """ | 1566 """ |
| 1403 super(WorkerThread, self).__init__() | 1567 super(WorkerThread, self).__init__() |
| 1404 self.task_queue = task_queue | 1568 self.task_queue = task_queue |
| 1405 self.multiprocessing_is_available = multiprocessing_is_available | |
| 1406 self.daemon = True | 1569 self.daemon = True |
| 1407 self.cached_classes = {} | 1570 self.cached_classes = {} |
| 1408 self.shared_vars_updater = _SharedVariablesUpdater() | 1571 self.shared_vars_updater = _SharedVariablesUpdater() |
| 1409 | 1572 |
| 1573 self.thread_gsutil_api = None |
| 1574 if bucket_storage_uri_class and gsutil_api_map: |
| 1575 self.thread_gsutil_api = CloudApiDelegator( |
| 1576 bucket_storage_uri_class, gsutil_api_map, logger, debug=debug) |
| 1577 |
| 1410 def PerformTask(self, task, cls): | 1578 def PerformTask(self, task, cls): |
| 1411 """ | 1579 """Makes the function call for a task. |
| 1412 Makes the function call for a task. | |
| 1413 | 1580 |
| 1414 Args: | 1581 Args: |
| 1415 task: The Task to perform. | 1582 task: The Task to perform. |
| 1416 cls: The instance of a class which gives context to the functions called | 1583 cls: The instance of a class which gives context to the functions called |
| 1417 by the Task's function. E.g., see _SetAclFuncWrapper. | 1584 by the Task's function. E.g., see SetAclFuncWrapper. |
| 1418 """ | 1585 """ |
| 1419 caller_id = task.caller_id | 1586 caller_id = task.caller_id |
| 1420 try: | 1587 try: |
| 1421 results = task.func(cls, task.args) | 1588 results = task.func(cls, task.args, thread_state=self.thread_gsutil_api) |
| 1422 if task.should_return_results: | 1589 if task.should_return_results: |
| 1423 global_return_values_map.update(caller_id, [results], default_value=[]) | 1590 global_return_values_map.Update(caller_id, [results], default_value=[]) |
| 1424 except Exception, e: | 1591 except Exception, e: # pylint: disable=broad-except |
| 1592 _IncrementFailureCount() |
| 1425 if task.fail_on_error: | 1593 if task.fail_on_error: |
| 1426 raise # Only happens for single thread and process case. | 1594 raise # Only happens for single thread and process case. |
| 1427 else: | 1595 else: |
| 1428 try: | 1596 try: |
| 1429 task.exception_handler(cls, e) | 1597 task.exception_handler(cls, e) |
| 1430 except Exception, e1: | 1598 except Exception, _: # pylint: disable=broad-except |
| 1431 # Don't allow callers to throw exceptions here and kill the worker | 1599 # Don't allow callers to raise exceptions here and kill the worker |
| 1432 # threads. | 1600 # threads. |
| 1433 cls.logger.debug( | 1601 cls.logger.debug( |
| 1434 'Caught exception while handling exception for %s:\n%s', | 1602 'Caught exception while handling exception for %s:\n%s', |
| 1435 task, traceback.format_exc()) | 1603 task, traceback.format_exc()) |
| 1436 finally: | 1604 finally: |
| 1437 self.shared_vars_updater.Update(caller_id, cls) | 1605 self.shared_vars_updater.Update(caller_id, cls) |
| 1438 | 1606 |
| 1439 # Even if we encounter an exception, we still need to claim that that | 1607 # Even if we encounter an exception, we still need to claim that that |
| 1440 # the function finished executing. Otherwise, we won't know when to | 1608 # the function finished executing. Otherwise, we won't know when to |
| 1441 # stop waiting and return results. | 1609 # stop waiting and return results. |
| 1442 num_done = caller_id_finished_count.update(caller_id, 1) | 1610 num_done = caller_id_finished_count.Update(caller_id, 1) |
| 1443 if self.multiprocessing_is_available: | 1611 |
| 1444 _NotifyIfDone(caller_id, num_done) | 1612 if cls.multiprocessing_is_available: |
| 1613 _NotifyIfDone(caller_id, num_done) |
| 1445 | 1614 |
| 1446 def run(self): | 1615 def run(self): |
| 1447 while True: | 1616 while True: |
| 1448 task = self.task_queue.get() | 1617 task = self.task_queue.get() |
| 1449 caller_id = task.caller_id | 1618 caller_id = task.caller_id |
| 1450 | 1619 |
| 1451 # Get the instance of the command with the appropriate context. | 1620 # Get the instance of the command with the appropriate context. |
| 1452 cls = self.cached_classes.get(caller_id, None) | 1621 cls = self.cached_classes.get(caller_id, None) |
| 1453 if not cls: | 1622 if not cls: |
| 1454 cls = copy.copy(class_map[caller_id]) | 1623 cls = copy.copy(class_map[caller_id]) |
| 1455 cls.logger = CreateGsutilLogger(cls.command_name) | 1624 cls.logger = CreateGsutilLogger(cls.command_name) |
| 1456 self.cached_classes[caller_id] = cls | 1625 self.cached_classes[caller_id] = cls |
| 1457 | 1626 |
| 1458 self.PerformTask(task, cls) | 1627 self.PerformTask(task, cls) |
| 1459 | 1628 |
| 1460 | 1629 |
| 1461 class _SharedVariablesUpdater(object): | 1630 class _SharedVariablesUpdater(object): |
| 1462 """Used to update shared variable for a class in the global map. Note that | 1631 """Used to update shared variable for a class in the global map. |
| 1463 each thread will have its own instance of the calling class for context, | 1632 |
| 1464 and it will also have its own instance of a _SharedVariablesUpdater. | 1633 Note that each thread will have its own instance of the calling class for |
| 1465 This is used in the following way: | 1634 context, and it will also have its own instance of a |
| 1635 _SharedVariablesUpdater. This is used in the following way: |
| 1466 | 1636 |
| 1467 1. Before any tasks are performed, each thread will get a copy of the | 1637 1. Before any tasks are performed, each thread will get a copy of the |
| 1468 calling class, and the globally-consistent value of this shared variable | 1638 calling class, and the globally-consistent value of this shared variable |
| 1469 will be initialized to whatever it was before the call to Apply began. | 1639 will be initialized to whatever it was before the call to Apply began. |
| 1470 | 1640 |
| 1471 2. After each time a thread performs a task, it will look at the current | 1641 2. After each time a thread performs a task, it will look at the current |
| 1472 values of the shared variables in its instance of the calling class. | 1642 values of the shared variables in its instance of the calling class. |
| 1473 | 1643 |
| 1474 2.A. For each such variable, it computes the delta of this variable | 1644 2.A. For each such variable, it computes the delta of this variable |
| 1475 between the last known value for this class (which is stored in | 1645 between the last known value for this class (which is stored in |
| 1476 a dict local to this class) and the current value of the variable | 1646 a dict local to this class) and the current value of the variable |
| 1477 in the class. | 1647 in the class. |
| 1478 | 1648 |
| 1479 2.B. Using this delta, we update the last known value locally as well | 1649 2.B. Using this delta, we update the last known value locally as well |
| 1480 as the globally-consistent value shared across all classes (the | 1650 as the globally-consistent value shared across all classes (the |
| 1481 globally consistent value is simply increased by the computed | 1651 globally consistent value is simply increased by the computed |
| 1482 delta). | 1652 delta). |
| 1483 """ | 1653 """ |
| 1654 |
| 1484 def __init__(self): | 1655 def __init__(self): |
| 1485 self.last_shared_var_values = {} | 1656 self.last_shared_var_values = {} |
| 1486 | 1657 |
| 1487 def Update(self, caller_id, cls): | 1658 def Update(self, caller_id, cls): |
| 1488 """Update any shared variables with their deltas.""" | 1659 """Update any shared variables with their deltas.""" |
| 1489 shared_vars = shared_vars_list_map.get(caller_id, None) | 1660 shared_vars = shared_vars_list_map.get(caller_id, None) |
| 1490 if shared_vars: | 1661 if shared_vars: |
| 1491 for name in shared_vars: | 1662 for name in shared_vars: |
| 1492 key = (caller_id, name) | 1663 key = (caller_id, name) |
| 1493 last_value = self.last_shared_var_values.get(key, 0) | 1664 last_value = self.last_shared_var_values.get(key, 0) |
| 1494 # Compute the change made since the last time we updated here. This is | 1665 # Compute the change made since the last time we updated here. This is |
| 1495 # calculated by simply subtracting the last known value from the current | 1666 # calculated by simply subtracting the last known value from the current |
| 1496 # value in the class instance. | 1667 # value in the class instance. |
| 1497 delta = getattr(cls, name) - last_value | 1668 delta = getattr(cls, name) - last_value |
| 1498 self.last_shared_var_values[key] = delta + last_value | 1669 self.last_shared_var_values[key] = delta + last_value |
| 1499 | 1670 |
| 1500 # Update the globally-consistent value by simply increasing it by the | 1671 # Update the globally-consistent value by simply increasing it by the |
| 1501 # computed delta. | 1672 # computed delta. |
| 1502 shared_vars_map.update(key, delta) | 1673 shared_vars_map.Update(key, delta) |
| 1503 | 1674 |
| 1504 | 1675 |
| 1505 def _NotifyIfDone(caller_id, num_done): | 1676 def _NotifyIfDone(caller_id, num_done): |
| 1506 """ | 1677 """Notify any threads waiting for results that something has finished. |
| 1507 Notify any threads that are waiting for results that something has finished. | 1678 |
| 1508 Each waiting thread will then need to check the call_completed_map to see if | 1679 Each waiting thread will then need to check the call_completed_map to see if |
| 1509 its work is done. | 1680 its work is done. |
| 1510 | 1681 |
| 1511 Note that num_done could be calculated here, but it is passed in as an | 1682 Note that num_done could be calculated here, but it is passed in as an |
| 1512 optimization so that we have one less call to a globally-locked data | 1683 optimization so that we have one less call to a globally-locked data |
| 1513 structure. | 1684 structure. |
| 1514 | 1685 |
| 1515 Args: | 1686 Args: |
| 1516 caller_id: The caller_id of the function whose progress we're checking. | 1687 caller_id: The caller_id of the function whose progress we're checking. |
| 1517 num_done: The number of tasks currently completed for that caller_id. | 1688 num_done: The number of tasks currently completed for that caller_id. |
| 1518 """ | 1689 """ |
| 1519 num_to_do = total_tasks[caller_id] | 1690 num_to_do = total_tasks[caller_id] |
| 1520 if num_to_do == num_done and num_to_do >= 0: | 1691 if num_to_do == num_done and num_to_do >= 0: |
| 1521 # Notify the Apply call that's sleeping that it's ready to return. | 1692 # Notify the Apply call that's sleeping that it's ready to return. |
| 1522 with need_pool_or_done_cond: | 1693 with need_pool_or_done_cond: |
| 1523 call_completed_map[caller_id] = True | 1694 call_completed_map[caller_id] = True |
| 1524 need_pool_or_done_cond.notify_all() | 1695 need_pool_or_done_cond.notify_all() |
| 1525 | 1696 |
| 1697 |
| 1526 def ShutDownGsutil(): | 1698 def ShutDownGsutil(): |
| 1527 """Shut down all processes in consumer pools in preparation for exiting.""" | 1699 """Shut down all processes in consumer pools in preparation for exiting.""" |
| 1528 for q in queues: | 1700 for q in queues: |
| 1529 try: | 1701 try: |
| 1530 q.cancel_join_thread() | 1702 q.cancel_join_thread() |
| 1531 except: | 1703 except: # pylint: disable=bare-except |
| 1532 pass | 1704 pass |
| 1533 for consumer_pool in consumer_pools: | 1705 for consumer_pool in consumer_pools: |
| 1534 consumer_pool.ShutDown() | 1706 consumer_pool.ShutDown() |
| 1535 | 1707 |
| 1708 |
| 1709 # pylint: disable=global-variable-undefined |
| 1710 def _IncrementFailureCount(): |
| 1711 global failure_count |
| 1712 if isinstance(failure_count, int): |
| 1713 failure_count += 1 |
| 1714 else: # Otherwise it's a multiprocessing.Value() of type 'i'. |
| 1715 failure_count.value += 1 |
| 1716 |
| 1717 |
| 1718 # pylint: disable=global-variable-undefined |
| 1719 def GetFailureCount(): |
| 1720 """Returns the number of failures processed during calls to Apply().""" |
| 1721 try: |
| 1722 if isinstance(failure_count, int): |
| 1723 return failure_count |
| 1724 else: # It's a multiprocessing.Value() of type 'i'. |
| 1725 return failure_count.value |
| 1726 except NameError: # If it wasn't initialized, Apply() wasn't called. |
| 1727 return 0 |
| 1728 |
| 1729 |
| 1730 def ResetFailureCount(): |
| 1731 """Resets the failure_count variable to 0 - useful if error is expected.""" |
| 1732 try: |
| 1733 global failure_count |
| 1734 if isinstance(failure_count, int): |
| 1735 failure_count = 0 |
| 1736 else: # It's a multiprocessing.Value() of type 'i'. |
| 1737 failure_count = multiprocessing.Value('i', 0) |
| 1738 except NameError: # If it wasn't initialized, Apply() wasn't called. |
| 1739 pass |
| OLD | NEW |