Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(33)

Side by Side Diff: gslib/command.py

Issue 698893003: Update checked in version of gsutil to version 4.6 (Closed) Base URL: http://dart.googlecode.com/svn/third_party/gsutil/
Patch Set: Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « gslib/cloud_api_helper.py ('k') | gslib/command_runner.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # -*- coding: utf-8 -*-
1 # Copyright 2010 Google Inc. All Rights Reserved. 2 # Copyright 2010 Google Inc. All Rights Reserved.
2 # 3 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License. 5 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at 6 # You may obtain a copy of the License at
6 # 7 #
7 # http://www.apache.org/licenses/LICENSE-2.0 8 # http://www.apache.org/licenses/LICENSE-2.0
8 # 9 #
9 # Unless required by applicable law or agreed to in writing, software 10 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and 13 # See the License for the specific language governing permissions and
13 # limitations under the License. 14 # limitations under the License.
14
15 """Base class for gsutil commands. 15 """Base class for gsutil commands.
16 16
17 In addition to base class code, this file contains helpers that depend on base 17 In addition to base class code, this file contains helpers that depend on base
18 class state (such as GetAclCommandHelper) In general, functions that depend on 18 class state (such as GetAndPrintAcl) In general, functions that depend on
19 class state and that are used by multiple commands belong in this file. 19 class state and that are used by multiple commands belong in this file.
20 Functions that don't depend on class state belong in util.py, and non-shared 20 Functions that don't depend on class state belong in util.py, and non-shared
21 helpers belong in individual subclasses. 21 helpers belong in individual subclasses.
22 """ 22 """
23 23
24 import boto 24 from __future__ import absolute_import
25
25 import codecs 26 import codecs
27 from collections import namedtuple
26 import copy 28 import copy
27 import getopt 29 import getopt
28 import gslib 30 from getopt import GetoptError
29 import logging 31 import logging
30 import multiprocessing 32 import multiprocessing
31 import os 33 import os
32 import Queue 34 import Queue
33 import re
34 import signal 35 import signal
35 import sys 36 import sys
36 import textwrap 37 import textwrap
37 import threading 38 import threading
38 import traceback 39 import traceback
39 import wildcard_iterator
40 import xml.dom.minidom
41 40
42 from boto import handler 41 import boto
43 from boto.exception import GSResponseError
44 from boto.storage_uri import StorageUri 42 from boto.storage_uri import StorageUri
45 from collections import namedtuple 43 import gslib
46 from getopt import GetoptError 44 from gslib.cloud_api import AccessDeniedException
47 from gslib import util 45 from gslib.cloud_api import ArgumentException
46 from gslib.cloud_api import ServiceException
47 from gslib.cloud_api_delegator import CloudApiDelegator
48 from gslib.cs_api_map import ApiSelector
49 from gslib.cs_api_map import GsutilApiMapFactory
48 from gslib.exception import CommandException 50 from gslib.exception import CommandException
49 from gslib.help_provider import HelpProvider 51 from gslib.help_provider import HelpProvider
50 from gslib.name_expansion import NameExpansionIterator 52 from gslib.name_expansion import NameExpansionIterator
51 from gslib.name_expansion import NameExpansionIteratorQueue 53 from gslib.name_expansion import NameExpansionResult
52 from gslib.parallelism_framework_util import AtomicIncrementDict 54 from gslib.parallelism_framework_util import AtomicIncrementDict
53 from gslib.parallelism_framework_util import BasicIncrementDict 55 from gslib.parallelism_framework_util import BasicIncrementDict
54 from gslib.parallelism_framework_util import ThreadAndProcessSafeDict 56 from gslib.parallelism_framework_util import ThreadAndProcessSafeDict
55 from gslib.project_id import ProjectIdHandler 57 from gslib.plurality_checkable_iterator import PluralityCheckableIterator
56 from gslib.storage_uri_builder import StorageUriBuilder 58 from gslib.storage_url import StorageUrlFromString
59 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m essages
60 from gslib.translation_helper import AclTranslation
57 from gslib.util import GetConfigFilePath 61 from gslib.util import GetConfigFilePath
62 from gslib.util import HaveFileUrls
63 from gslib.util import HaveProviderUrls
58 from gslib.util import IS_WINDOWS 64 from gslib.util import IS_WINDOWS
59 from gslib.util import MultiprocessingIsAvailable 65 from gslib.util import MultiprocessingIsAvailable
60 from gslib.util import NO_MAX 66 from gslib.util import NO_MAX
61 from gslib.wildcard_iterator import ContainsWildcard 67 from gslib.util import UrlsAreForSingleProvider
62 from oauth2client.client import HAS_CRYPTO 68 from gslib.util import UTF8
69 from gslib.wildcard_iterator import CreateWildcardIterator
70
71 OFFER_GSUTIL_M_SUGGESTION_THRESHOLD = 5
63 72
64 if IS_WINDOWS: 73 if IS_WINDOWS:
65 import ctypes 74 import ctypes # pylint: disable=g-import-not-at-top
66 75
67 76
68 def _DefaultExceptionHandler(cls, e): 77 def _DefaultExceptionHandler(cls, e):
69 cls.logger.exception(e) 78 cls.logger.exception(e)
70 79
80
71 def CreateGsutilLogger(command_name): 81 def CreateGsutilLogger(command_name):
72 """Creates a logger that resembles 'print' output, but is thread safe and 82 """Creates a logger that resembles 'print' output.
73 abides by gsutil -d/-D/-DD/-q options. 83
84 This logger abides by gsutil -d/-D/-DD/-q options.
74 85
75 By default (if none of the above options is specified) the logger will display 86 By default (if none of the above options is specified) the logger will display
76 all messages logged with level INFO or above. Log propagation is disabled. 87 all messages logged with level INFO or above. Log propagation is disabled.
77 88
89 Args:
90 command_name: Command name to create logger for.
91
78 Returns: 92 Returns:
79 A logger object. 93 A logger object.
80 """ 94 """
81 log = logging.getLogger(command_name) 95 log = logging.getLogger(command_name)
82 log.propagate = False 96 log.propagate = False
83 log.setLevel(logging.root.level) 97 log.setLevel(logging.root.level)
84 log_handler = logging.StreamHandler() 98 log_handler = logging.StreamHandler()
85 log_handler.setFormatter(logging.Formatter('%(message)s')) 99 log_handler.setFormatter(logging.Formatter('%(message)s'))
86 # Commands that call other commands (like mv) would cause log handlers to be 100 # Commands that call other commands (like mv) would cause log handlers to be
87 # added more than once, so avoid adding if one is already present. 101 # added more than once, so avoid adding if one is already present.
88 if not log.handlers: 102 if not log.handlers:
89 log.addHandler(log_handler) 103 log.addHandler(log_handler)
90 return log 104 return log
91 105
92 def _UriArgChecker(command_instance, uri): 106
93 exp_src_uri = command_instance.suri_builder.StorageUri( 107 def _UrlArgChecker(command_instance, url):
94 uri.GetExpandedUriStr()) 108 if not command_instance.exclude_symlinks:
95 command_instance.logger.debug('process %d is handling uri %s', 109 return True
96 os.getpid(), exp_src_uri) 110 exp_src_url = url.expanded_storage_url
97 if (command_instance.exclude_symlinks and exp_src_uri.is_file_uri() 111 if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name):
98 and os.path.islink(exp_src_uri.object_name)): 112 command_instance.logger.info('Skipping symbolic link %s...', exp_src_url)
99 command_instance.logger.info('Skipping symbolic link %s...', exp_src_uri)
100 return False 113 return False
101 return True 114 return True
102 115
103 116
104 def DummyArgChecker(command_instance, arg): 117 def DummyArgChecker(*unused_args):
105 return True 118 return True
106 119
107 def _SetAclFuncWrapper(cls, name_expansion_result):
108 return cls._SetAclFunc(name_expansion_result)
109 120
110 def _SetAclExceptionHandler(cls, e): 121 def SetAclFuncWrapper(cls, name_expansion_result, thread_state=None):
122 return cls.SetAclFunc(name_expansion_result, thread_state=thread_state)
123
124
125 def SetAclExceptionHandler(cls, e):
111 """Exception handler that maintains state about post-completion status.""" 126 """Exception handler that maintains state about post-completion status."""
112 cls.logger.error(str(e)) 127 cls.logger.error(str(e))
113 cls.everything_set_okay = False 128 cls.everything_set_okay = False
114 129
115 # We will keep this list of all thread- or process-safe queues ever created by 130 # We will keep this list of all thread- or process-safe queues ever created by
116 # the main thread so that we can forcefully kill them upon shutdown. Otherwise, 131 # the main thread so that we can forcefully kill them upon shutdown. Otherwise,
117 # we encounter a Python bug in which empty queues block forever on join (which 132 # we encounter a Python bug in which empty queues block forever on join (which
118 # is called as part of the Python exit function cleanup) under the impression 133 # is called as part of the Python exit function cleanup) under the impression
119 # that they are non-empty. 134 # that they are non-empty.
120 # However, this also lets us shut down somewhat more cleanly when interrupted. 135 # However, this also lets us shut down somewhat more cleanly when interrupted.
121 queues = [] 136 queues = []
122 137
138
123 def _NewMultiprocessingQueue(): 139 def _NewMultiprocessingQueue():
124 queue = multiprocessing.Queue(MAX_QUEUE_SIZE) 140 queue = multiprocessing.Queue(MAX_QUEUE_SIZE)
125 queues.append(queue) 141 queues.append(queue)
126 return queue 142 return queue
127 143
144
128 def _NewThreadsafeQueue(): 145 def _NewThreadsafeQueue():
129 queue = Queue.Queue(MAX_QUEUE_SIZE) 146 queue = Queue.Queue(MAX_QUEUE_SIZE)
130 queues.append(queue) 147 queues.append(queue)
131 return queue 148 return queue
132 149
133
134 # command_spec key constants.
135 COMMAND_NAME = 'command_name'
136 COMMAND_NAME_ALIASES = 'command_name_aliases'
137 MIN_ARGS = 'min_args'
138 MAX_ARGS = 'max_args'
139 SUPPORTED_SUB_ARGS = 'supported_sub_args'
140 FILE_URIS_OK = 'file_uri_ok'
141 PROVIDER_URIS_OK = 'provider_uri_ok'
142 URIS_START_ARG = 'uris_start_arg'
143
144 # The maximum size of a process- or thread-safe queue. Imposing this limit 150 # The maximum size of a process- or thread-safe queue. Imposing this limit
145 # prevents us from needing to hold an arbitrary amount of data in memory. 151 # prevents us from needing to hold an arbitrary amount of data in memory.
146 # However, setting this number too high (e.g., >= 32768 on OS X) can cause 152 # However, setting this number too high (e.g., >= 32768 on OS X) can cause
147 # problems on some operating systems. 153 # problems on some operating systems.
148 MAX_QUEUE_SIZE = 32500 154 MAX_QUEUE_SIZE = 32500
149 155
150 # That maximum depth of the tree of recursive calls to command.Apply. This is 156 # That maximum depth of the tree of recursive calls to command.Apply. This is
151 # an arbitrary limit put in place to prevent developers from accidentally 157 # an arbitrary limit put in place to prevent developers from accidentally
152 # causing problems with infinite recursion, and it can be increased if needed. 158 # causing problems with infinite recursion, and it can be increased if needed.
153 MAX_RECURSIVE_DEPTH = 5 159 MAX_RECURSIVE_DEPTH = 5
154 160
155 ZERO_TASKS_TO_DO_ARGUMENT = ("There were no", "tasks to do") 161 ZERO_TASKS_TO_DO_ARGUMENT = ('There were no', 'tasks to do')
156 162
157 # Map from deprecated aliases to the current command and subcommands that 163 # Map from deprecated aliases to the current command and subcommands that
158 # provide the same behavior. 164 # provide the same behavior.
159 # TODO: Remove this map and deprecate old commands on 9/9/14. 165 # TODO: Remove this map and deprecate old commands on 9/9/14.
160 OLD_ALIAS_MAP = {'chacl': ['acl', 'ch'], 166 OLD_ALIAS_MAP = {'chacl': ['acl', 'ch'],
161 'getacl': ['acl', 'get'], 167 'getacl': ['acl', 'get'],
162 'setacl': ['acl', 'set'], 168 'setacl': ['acl', 'set'],
163 'getcors': ['cors', 'get'], 169 'getcors': ['cors', 'get'],
164 'setcors': ['cors', 'set'], 170 'setcors': ['cors', 'set'],
165 'chdefacl': ['defacl', 'ch'], 171 'chdefacl': ['defacl', 'ch'],
166 'getdefacl': ['defacl', 'get'], 172 'getdefacl': ['defacl', 'get'],
167 'setdefacl': ['defacl', 'set'], 173 'setdefacl': ['defacl', 'set'],
168 'disablelogging': ['logging', 'set', 'off'], 174 'disablelogging': ['logging', 'set', 'off'],
169 'enablelogging': ['logging', 'set', 'on'], 175 'enablelogging': ['logging', 'set', 'on'],
170 'getlogging': ['logging', 'get'], 176 'getlogging': ['logging', 'get'],
171 'getversioning': ['versioning', 'get'], 177 'getversioning': ['versioning', 'get'],
172 'setversioning': ['versioning', 'set'], 178 'setversioning': ['versioning', 'set'],
173 'getwebcfg': ['web', 'get'], 179 'getwebcfg': ['web', 'get'],
174 'setwebcfg': ['web', 'set']} 180 'setwebcfg': ['web', 'set']}
175 181
176 182
177 # Declare all of the module level variables - see 183 # Declare all of the module level variables - see
178 # InitializeMultiprocessingVariables for an explanation of why this is 184 # InitializeMultiprocessingVariables for an explanation of why this is
179 # necessary. 185 # necessary.
186 # pylint: disable=global-at-module-level
180 global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter 187 global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter
181 global total_tasks, call_completed_map, global_return_values_map 188 global total_tasks, call_completed_map, global_return_values_map
182 global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed 189 global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed
183 global current_max_recursive_level, shared_vars_map, shared_vars_list_map 190 global current_max_recursive_level, shared_vars_map, shared_vars_list_map
184 global class_map 191 global class_map, worker_checking_level_lock, failure_count
185 192
186 193
187 def InitializeMultiprocessingVariables(): 194 def InitializeMultiprocessingVariables():
188 """ 195 """Initializes module-level variables that will be inherited by subprocesses.
189 Used to initialize module-level variables that will be inherited by 196
190 subprocesses. On Windows, a multiprocessing.Manager object should only 197 On Windows, a multiprocessing.Manager object should only
191 be created within an "if __name__ == '__main__':" block. This function 198 be created within an "if __name__ == '__main__':" block. This function
192 must be called, otherwise every command that calls Command.Apply will fail. 199 must be called, otherwise every command that calls Command.Apply will fail.
193 """ 200 """
194 # This list of global variables must exactly match the above list of 201 # This list of global variables must exactly match the above list of
195 # declarations. 202 # declarations.
203 # pylint: disable=global-variable-undefined
196 global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter 204 global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter
197 global total_tasks, call_completed_map, global_return_values_map 205 global total_tasks, call_completed_map, global_return_values_map
198 global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed 206 global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed
199 global current_max_recursive_level, shared_vars_map, shared_vars_list_map 207 global current_max_recursive_level, shared_vars_map, shared_vars_list_map
200 global class_map 208 global class_map, worker_checking_level_lock, failure_count
201 209
202 manager = multiprocessing.Manager() 210 manager = multiprocessing.Manager()
203 211
204 consumer_pools = [] 212 consumer_pools = []
205 213
206 # List of all existing task queues - used by all pools to find the queue 214 # List of all existing task queues - used by all pools to find the queue
207 # that's appropriate for the given recursive_apply_level. 215 # that's appropriate for the given recursive_apply_level.
208 task_queues = [] 216 task_queues = []
209 217
210 # Used to assign a globally unique caller ID to each Apply call. 218 # Used to assign a globally unique caller ID to each Apply call.
211 caller_id_lock = manager.Lock() 219 caller_id_lock = manager.Lock()
212 caller_id_counter = multiprocessing.Value('i', 0) 220 caller_id_counter = multiprocessing.Value('i', 0)
213 221
214 # Map from caller_id to total number of tasks to be completed for that ID. 222 # Map from caller_id to total number of tasks to be completed for that ID.
215 total_tasks = ThreadAndProcessSafeDict(manager) 223 total_tasks = ThreadAndProcessSafeDict(manager)
216 224
217 # Map from caller_id to a boolean which is True iff all its tasks are 225 # Map from caller_id to a boolean which is True iff all its tasks are
218 # finished. 226 # finished.
219 call_completed_map = ThreadAndProcessSafeDict(manager) 227 call_completed_map = ThreadAndProcessSafeDict(manager)
220 228
221 # Used to keep track of the set of return values for each caller ID. 229 # Used to keep track of the set of return values for each caller ID.
222 global_return_values_map = AtomicIncrementDict(manager) 230 global_return_values_map = AtomicIncrementDict(manager)
223 231
224 # Condition used to notify any waiting threads that a task has finished or 232 # Condition used to notify any waiting threads that a task has finished or
225 # that a call to Apply needs a new set of consumer processes. 233 # that a call to Apply needs a new set of consumer processes.
226 need_pool_or_done_cond = manager.Condition() 234 need_pool_or_done_cond = manager.Condition()
227 235
236 # Lock used to prevent multiple worker processes from asking the main thread
237 # to create a new consumer pool for the same level.
238 worker_checking_level_lock = manager.Lock()
239
228 # Map from caller_id to the current number of completed tasks for that ID. 240 # Map from caller_id to the current number of completed tasks for that ID.
229 caller_id_finished_count = AtomicIncrementDict(manager) 241 caller_id_finished_count = AtomicIncrementDict(manager)
230 242
231 # Used as a way for the main thread to distinguish between being woken up 243 # Used as a way for the main thread to distinguish between being woken up
232 # by another call finishing and being woken up by a call that needs a new set 244 # by another call finishing and being woken up by a call that needs a new set
233 # of consumer processes. 245 # of consumer processes.
234 new_pool_needed = multiprocessing.Value('i', 0) 246 new_pool_needed = multiprocessing.Value('i', 0)
235 247
236 current_max_recursive_level = multiprocessing.Value('i', 0) 248 current_max_recursive_level = multiprocessing.Value('i', 0)
237 249
238 # Map from (caller_id, name) to the value of that shared variable. 250 # Map from (caller_id, name) to the value of that shared variable.
239 shared_vars_map = AtomicIncrementDict(manager) 251 shared_vars_map = AtomicIncrementDict(manager)
240 shared_vars_list_map = ThreadAndProcessSafeDict(manager) 252 shared_vars_list_map = ThreadAndProcessSafeDict(manager)
241 253
242 # Map from caller_id to calling class. 254 # Map from caller_id to calling class.
243 class_map = manager.dict() 255 class_map = manager.dict()
244 256
257 # Number of tasks that resulted in an exception in calls to Apply().
258 failure_count = multiprocessing.Value('i', 0)
245 259
246 class Command(object):
247 REQUIRED_SPEC_KEYS = [COMMAND_NAME]
248 260
249 # Each subclass must define the following map, minimally including the 261 # Each subclass of Command must define a property named 'command_spec' that is
250 # keys in REQUIRED_SPEC_KEYS; other values below will be used as defaults, 262 # an instance of the following class.
251 # although for readbility subclasses should specify the complete map. 263 CommandSpec = namedtuple('CommandSpec', [
252 command_spec = {
253 # Name of command. 264 # Name of command.
254 COMMAND_NAME : None, 265 'command_name',
255 # List of command name aliases. 266 # List of command name aliases.
256 COMMAND_NAME_ALIASES : [], 267 'command_name_aliases',
257 # Min number of args required by this command. 268 # Min number of args required by this command.
258 MIN_ARGS : 0, 269 'min_args',
259 # Max number of args required by this command, or NO_MAX. 270 # Max number of args required by this command, or NO_MAX.
260 MAX_ARGS : NO_MAX, 271 'max_args',
261 # Getopt-style string specifying acceptable sub args. 272 # Getopt-style string specifying acceptable sub args.
262 SUPPORTED_SUB_ARGS : '', 273 'supported_sub_args',
263 # True if file URIs are acceptable for this command. 274 # True if file URLs are acceptable for this command.
264 FILE_URIS_OK : False, 275 'file_url_ok',
265 # True if provider-only URIs are acceptable for this command. 276 # True if provider-only URLs are acceptable for this command.
266 PROVIDER_URIS_OK : False, 277 'provider_url_ok',
267 # Index in args of first URI arg. 278 # Index in args of first URL arg.
268 URIS_START_ARG : 0, 279 'urls_start_arg',
269 } 280 # List of supported APIs
270 _default_command_spec = command_spec 281 'gs_api_support',
271 help_spec = HelpProvider.help_spec 282 # Default API to use for this command
272 _commands_with_subcommands_and_subopts = ['acl', 'defacl', 'logging', 'web'] 283 'gs_default_api',
284 # Private arguments (for internal testing)
285 'supported_private_args',
286 ])
273 287
274 """Define an empty test specification, which derived classes must populate.
275 288
276 This is a list of tuples containing the following values: 289 class Command(HelpProvider):
290 """Base class for all gsutil commands."""
277 291
278 step_name - mnemonic name for test, displayed when test is run 292 # Each subclass must override this with an instance of CommandSpec.
279 cmd_line - shell command line to run test 293 command_spec = None
280 expect_ret or None - expected return code from test (None means ignore)
281 (result_file, expect_file) or None - tuple of result file and expected
282 file to diff for additional test
283 verification beyond the return code
284 (None means no diff requested)
285 Notes:
286 294
287 - Setting expected_ret to None means there is no expectation and, 295 _commands_with_subcommands_and_subopts = ['acl', 'defacl', 'logging', 'web',
288 hence, any returned value will pass. 296 'notification']
289 297
290 - Any occurrences of the string 'gsutil' in the cmd_line parameter
291 are expanded to the full path to the gsutil command under test.
292
293 - The cmd_line, result_file and expect_file parameters may
294 contain the following special substrings:
295
296 $Bn - converted to one of 10 unique-for-testing bucket names (n=0..9)
297 $On - converted to one of 10 unique-for-testing object names (n=0..9)
298 $Fn - converted to one of 10 unique-for-testing file names (n=0..9)
299 $G - converted to the directory where gsutil is installed. Useful for
300 referencing test data.
301
302 - The generated file names are full pathnames, whereas the generated
303 bucket and object names are simple relative names.
304
305 - Tests with a non-None result_file and expect_file automatically
306 trigger an implicit diff of the two files.
307
308 - These test specifications, in combination with the conversion strings
309 allow tests to be constructed parametrically. For example, here's an
310 annotated subset of a test_steps for the cp command:
311
312 # Copy local file to object, verify 0 return code.
313 ('simple cp', 'gsutil cp $F1 gs://$B1/$O1', 0, None, None),
314 # Copy uploaded object back to local file and diff vs. orig file.
315 ('verify cp', 'gsutil cp gs://$B1/$O1 $F2', 0, '$F2', '$F1'),
316
317 - After pattern substitution, the specs are run sequentially, in the
318 order in which they appear in the test_steps list.
319 """
320 test_steps = []
321
322 # This keeps track of the recursive depth of the current call to Apply. 298 # This keeps track of the recursive depth of the current call to Apply.
323 recursive_apply_level = 0 299 recursive_apply_level = 0
324 300
325 # If the multiprocessing module isn't available, we'll use this to keep track 301 # If the multiprocessing module isn't available, we'll use this to keep track
326 # of the caller_id. 302 # of the caller_id.
327 sequential_caller_id = -1 303 sequential_caller_id = -1
328 304
305 @staticmethod
306 def CreateCommandSpec(command_name, command_name_aliases=None, min_args=0,
307 max_args=NO_MAX, supported_sub_args='',
308 file_url_ok=False, provider_url_ok=False,
309 urls_start_arg=0, gs_api_support=None,
310 gs_default_api=None, supported_private_args=None):
311 """Creates an instance of CommandSpec, with defaults."""
312 return CommandSpec(
313 command_name=command_name,
314 command_name_aliases=command_name_aliases or [],
315 min_args=min_args,
316 max_args=max_args,
317 supported_sub_args=supported_sub_args,
318 file_url_ok=file_url_ok,
319 provider_url_ok=provider_url_ok,
320 urls_start_arg=urls_start_arg,
321 gs_api_support=gs_api_support or [ApiSelector.XML],
322 gs_default_api=gs_default_api or ApiSelector.XML,
323 supported_private_args=supported_private_args)
324
329 # Define a convenience property for command name, since it's used many places. 325 # Define a convenience property for command name, since it's used many places.
330 def _GetDefaultCommandName(self): 326 def _GetDefaultCommandName(self):
331 return self.command_spec[COMMAND_NAME] 327 return self.command_spec.command_name
332 command_name = property(_GetDefaultCommandName) 328 command_name = property(_GetDefaultCommandName)
333 329
334 def _CalculateUrisStartArg(self): 330 def _CalculateUrlsStartArg(self):
335 """Calculate the index in args of the first URI arg. By default, just use 331 """Calculate the index in args of the first URL arg.
336 the value from command_spec. 332
333 Returns:
334 Index of the first URL arg (according to the command spec).
337 """ 335 """
338 return self.command_spec[URIS_START_ARG] 336 return self.command_spec.urls_start_arg
339 337
340 def _TranslateDeprecatedAliases(self, args): 338 def _TranslateDeprecatedAliases(self, args):
341 """For commands that have deprecated aliases, this will map the aliases to 339 """Map deprecated aliases to the corresponding new command, and warn."""
342 the corresponding new command and also warn the user about deprecation.
343 """
344 new_command_args = OLD_ALIAS_MAP.get(self.command_alias_used, None) 340 new_command_args = OLD_ALIAS_MAP.get(self.command_alias_used, None)
345 if new_command_args: 341 if new_command_args:
346 # Prepend any subcommands for the new command. The command name itself 342 # Prepend any subcommands for the new command. The command name itself
347 # is not part of the args, so leave it out. 343 # is not part of the args, so leave it out.
348 args = new_command_args[1:] + args 344 args = new_command_args[1:] + args
349 self.logger.warn('\n'.join(textwrap.wrap(( 345 self.logger.warn('\n'.join(textwrap.wrap(
350 'You are using a deprecated alias, "%(used_alias)s", for the ' 346 ('You are using a deprecated alias, "%(used_alias)s", for the '
351 '"%(command_name)s" command. This will stop working on 9/9/2014. ' 347 '"%(command_name)s" command. This will stop working on 9/9/2014. '
352 'Please use "%(command_name)s" with the appropriate sub-command in ' 348 'Please use "%(command_name)s" with the appropriate sub-command in '
353 'the future. See "gsutil help %(command_name)s" for details.') % 349 'the future. See "gsutil help %(command_name)s" for details.') %
354 {'used_alias': self.command_alias_used, 350 {'used_alias': self.command_alias_used,
355 'command_name': self.command_name }))) 351 'command_name': self.command_name})))
356 return args 352 return args
357 353
358 def __init__(self, command_runner, args, headers, debug, parallel_operations, 354 def __init__(self, command_runner, args, headers, debug, parallel_operations,
359 config_file_list, bucket_storage_uri_class, test_method=None, 355 bucket_storage_uri_class, gsutil_api_class_map_factory,
360 logging_filters=None, command_alias_used=None): 356 test_method=None, logging_filters=None,
361 """ 357 command_alias_used=None):
358 """Instantiates a Command.
359
362 Args: 360 Args:
363 command_runner: CommandRunner (for commands built atop other commands). 361 command_runner: CommandRunner (for commands built atop other commands).
364 args: Command-line args (arg0 = actual arg, not command name ala bash). 362 args: Command-line args (arg0 = actual arg, not command name ala bash).
365 headers: Dictionary containing optional HTTP headers to pass to boto. 363 headers: Dictionary containing optional HTTP headers to pass to boto.
366 debug: Debug level to pass in to boto connection (range 0..3). 364 debug: Debug level to pass in to boto connection (range 0..3).
367 parallel_operations: Should command operations be executed in parallel? 365 parallel_operations: Should command operations be executed in parallel?
368 config_file_list: Config file list returned by GetBotoConfigFileList().
369 bucket_storage_uri_class: Class to instantiate for cloud StorageUris. 366 bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
370 Settable for testing/mocking. 367 Settable for testing/mocking.
368 gsutil_api_class_map_factory: Creates map of cloud storage interfaces.
369 Settable for testing/mocking.
371 test_method: Optional general purpose method for testing purposes. 370 test_method: Optional general purpose method for testing purposes.
372 Application and semantics of this method will vary by 371 Application and semantics of this method will vary by
373 command and test type. 372 command and test type.
374 logging_filters: Optional list of logging.Filters to apply to this 373 logging_filters: Optional list of logging.Filters to apply to this
375 command's logger. 374 command's logger.
376 command_alias_used: The alias that was actually used when running this 375 command_alias_used: The alias that was actually used when running this
377 command (as opposed to the "official" command name, 376 command (as opposed to the "official" command name,
378 which will always correspond to the file name). 377 which will always correspond to the file name).
379 378
380 Implementation note: subclasses shouldn't need to define an __init__ 379 Implementation note: subclasses shouldn't need to define an __init__
381 method, and instead depend on the shared initialization that happens 380 method, and instead depend on the shared initialization that happens
382 here. If you do define an __init__ method in a subclass you'll need to 381 here. If you do define an __init__ method in a subclass you'll need to
383 explicitly call super().__init__(). But you're encouraged not to do this, 382 explicitly call super().__init__(). But you're encouraged not to do this,
384 because it will make changing the __init__ interface more painful. 383 because it will make changing the __init__ interface more painful.
385 """ 384 """
386 # Save class values from constructor params. 385 # Save class values from constructor params.
387 self.command_runner = command_runner 386 self.command_runner = command_runner
388 self.unparsed_args = args 387 self.unparsed_args = args
389 self.headers = headers 388 self.headers = headers
390 self.debug = debug 389 self.debug = debug
391 self.parallel_operations = parallel_operations 390 self.parallel_operations = parallel_operations
392 self.config_file_list = config_file_list
393 self.bucket_storage_uri_class = bucket_storage_uri_class 391 self.bucket_storage_uri_class = bucket_storage_uri_class
392 self.gsutil_api_class_map_factory = gsutil_api_class_map_factory
394 self.test_method = test_method 393 self.test_method = test_method
395 self.exclude_symlinks = False 394 self.exclude_symlinks = False
396 self.recursion_requested = False 395 self.recursion_requested = False
397 self.all_versions = False 396 self.all_versions = False
398 self.command_alias_used = command_alias_used 397 self.command_alias_used = command_alias_used
399 398
400 # Global instance of a threaded logger object. 399 # Global instance of a threaded logger object.
401 self.logger = CreateGsutilLogger(self.command_name) 400 self.logger = CreateGsutilLogger(self.command_name)
402 if logging_filters: 401 if logging_filters:
403 for filter in logging_filters: 402 for log_filter in logging_filters:
404 self.logger.addFilter(filter) 403 self.logger.addFilter(log_filter)
405 404
406 # Process sub-command instance specifications. 405 if self.command_spec is None:
407 # First, ensure subclass implementation sets all required keys. 406 raise CommandException('"%s" command implementation is missing a '
408 for k in self.REQUIRED_SPEC_KEYS: 407 'command_spec definition.' % self.command_name)
409 if k not in self.command_spec or self.command_spec[k] is None:
410 raise CommandException('"%s" command implementation is missing %s '
411 'specification' % (self.command_name, k))
412 # Now override default command_spec with subclass-specified values.
413 tmp = self._default_command_spec
414 tmp.update(self.command_spec)
415 self.command_spec = tmp
416 del tmp
417
418 # Make sure command provides a test specification.
419 if not self.test_steps:
420 # TODO: Uncomment following lines when test feature is ready.
421 #raise CommandException('"%s" command implementation is missing test '
422 #'specification' % self.command_name)
423 pass
424 408
425 # Parse and validate args. 409 # Parse and validate args.
426 args = self._TranslateDeprecatedAliases(args) 410 args = self._TranslateDeprecatedAliases(args)
427 try: 411 try:
428 (self.sub_opts, self.args) = getopt.getopt( 412 (self.sub_opts, self.args) = getopt.getopt(
429 args, self.command_spec[SUPPORTED_SUB_ARGS]) 413 args, self.command_spec.supported_sub_args,
414 self.command_spec.supported_private_args or [])
430 except GetoptError, e: 415 except GetoptError, e:
431 raise CommandException('%s for "%s" command.' % (e.msg, 416 raise CommandException('%s for "%s" command.' % (e.msg,
432 self.command_name)) 417 self.command_name))
433 self.command_spec[URIS_START_ARG] = self._CalculateUrisStartArg() 418 # Named tuple public functions start with _
434 419 # pylint: disable=protected-access
435 if (len(self.args) < self.command_spec[MIN_ARGS] 420 self.command_spec = self.command_spec._replace(
436 or len(self.args) > self.command_spec[MAX_ARGS]): 421 urls_start_arg=self._CalculateUrlsStartArg())
422
423 if (len(self.args) < self.command_spec.min_args
424 or len(self.args) > self.command_spec.max_args):
437 self._RaiseWrongNumberOfArgumentsException() 425 self._RaiseWrongNumberOfArgumentsException()
438 426
439 if not (self.command_name in 427 if self.command_name not in self._commands_with_subcommands_and_subopts:
440 self._commands_with_subcommands_and_subopts):
441 self.CheckArguments() 428 self.CheckArguments()
442 429
443 self.proj_id_handler = ProjectIdHandler() 430 # Build the support and default maps from the command spec.
444 self.suri_builder = StorageUriBuilder(debug, bucket_storage_uri_class) 431 support_map = {
432 'gs': self.command_spec.gs_api_support,
433 's3': [ApiSelector.XML]
434 }
435 default_map = {
436 'gs': self.command_spec.gs_default_api,
437 's3': ApiSelector.XML
438 }
439 self.gsutil_api_map = GsutilApiMapFactory.GetApiMap(
440 self.gsutil_api_class_map_factory, support_map, default_map)
441
442 self.project_id = None
443 self.gsutil_api = CloudApiDelegator(
444 bucket_storage_uri_class, self.gsutil_api_map,
445 self.logger, debug=self.debug)
445 446
446 # Cross-platform path to run gsutil binary. 447 # Cross-platform path to run gsutil binary.
447 self.gsutil_cmd = '' 448 self.gsutil_cmd = ''
448 # Cross-platform list containing gsutil path for use with subprocess.
449 self.gsutil_exec_list = []
450 # If running on Windows, invoke python interpreter explicitly. 449 # If running on Windows, invoke python interpreter explicitly.
451 if gslib.util.IS_WINDOWS: 450 if gslib.util.IS_WINDOWS:
452 self.gsutil_cmd += 'python ' 451 self.gsutil_cmd += 'python '
453 self.gsutil_exec_list += ['python']
454 # Add full path to gsutil to make sure we test the correct version. 452 # Add full path to gsutil to make sure we test the correct version.
455 self.gsutil_path = gslib.GSUTIL_PATH 453 self.gsutil_path = gslib.GSUTIL_PATH
456 self.gsutil_cmd += self.gsutil_path 454 self.gsutil_cmd += self.gsutil_path
457 self.gsutil_exec_list += [self.gsutil_path]
458 455
459 # We're treating recursion_requested like it's used by all commands, but 456 # We're treating recursion_requested like it's used by all commands, but
460 # only some of the commands accept the -R option. 457 # only some of the commands accept the -R option.
461 if self.sub_opts: 458 if self.sub_opts:
462 for o, unused_a in self.sub_opts: 459 for o, unused_a in self.sub_opts:
463 if o == '-r' or o == '-R': 460 if o == '-r' or o == '-R':
464 self.recursion_requested = True 461 self.recursion_requested = True
465 break 462 break
466 463
467 self.multiprocessing_is_available = MultiprocessingIsAvailable()[0] 464 self.multiprocessing_is_available = MultiprocessingIsAvailable()[0]
468 465
469 def _RaiseWrongNumberOfArgumentsException(self): 466 def _RaiseWrongNumberOfArgumentsException(self):
470 """Raise an exception indicating that the wrong number of arguments was 467 """Raises exception for wrong number of arguments supplied to command."""
471 provided for this command. 468 if len(self.args) > self.command_spec.max_args:
472 """
473 if len(self.args) > self.command_spec[MAX_ARGS]:
474 message = ('The %s command accepts at most %d arguments.' % 469 message = ('The %s command accepts at most %d arguments.' %
475 (self.command_name, self.command_spec[MAX_ARGS])) 470 (self.command_name, self.command_spec.max_args))
476 elif len(self.args) < self.command_spec[MIN_ARGS]: 471 elif len(self.args) < self.command_spec.min_args:
477 message = ('The %s command requires at least %d arguments.' % 472 message = ('The %s command requires at least %d arguments.' %
478 (self.command_name, self.command_spec[MIN_ARGS])) 473 (self.command_name, self.command_spec.min_args))
479 raise CommandException(message) 474 raise CommandException(message)
480 475
481 def CheckArguments(self): 476 def CheckArguments(self):
482 """Checks that the arguments provided on the command line fit the 477 """Checks that command line arguments match the command_spec.
483 expectations of the command_spec. Any commands in
484 self._commands_with_subcommands_and_subopts are responsible for calling
485 this method after handling initial parsing of their arguments.
486 This prevents commands with sub-commands as well as options from breaking
487 the parsing of getopt.
488 478
489 TODO: Provide a function to parse commands and sub-commands more 479 Any commands in self._commands_with_subcommands_and_subopts are responsible
490 intelligently once we stop allowing the deprecated command versions. 480 for calling this method after handling initial parsing of their arguments.
481 This prevents commands with sub-commands as well as options from breaking
482 the parsing of getopt.
483
484 TODO: Provide a function to parse commands and sub-commands more
485 intelligently once we stop allowing the deprecated command versions.
486
487 Raises:
488 CommandException if the arguments don't match.
491 """ 489 """
492 490
493 if (not self.command_spec[FILE_URIS_OK] 491 if (not self.command_spec.file_url_ok
494 and self.HaveFileUris(self.args[self.command_spec[URIS_START_ARG]:])): 492 and HaveFileUrls(self.args[self.command_spec.urls_start_arg:])):
495 raise CommandException('"%s" command does not support "file://" URIs. ' 493 raise CommandException('"%s" command does not support "file://" URLs. '
496 'Did you mean to use a gs:// URI?' % 494 'Did you mean to use a gs:// URL?' %
497 self.command_name) 495 self.command_name)
498 if (not self.command_spec[PROVIDER_URIS_OK] 496 if (not self.command_spec.provider_url_ok
499 and self._HaveProviderUris( 497 and HaveProviderUrls(self.args[self.command_spec.urls_start_arg:])):
500 self.args[self.command_spec[URIS_START_ARG]:])):
501 raise CommandException('"%s" command does not support provider-only ' 498 raise CommandException('"%s" command does not support provider-only '
502 'URIs.' % self.command_name) 499 'URLs.' % self.command_name)
503 500
504 def WildcardIterator(self, uri_or_str, all_versions=False): 501 def WildcardIterator(self, url_string, all_versions=False):
505 """ 502 """Helper to instantiate gslib.WildcardIterator.
506 Helper to instantiate gslib.WildcardIterator. Args are same as 503
507 gslib.WildcardIterator interface, but this method fills in most of the 504 Args are same as gslib.WildcardIterator interface, but this method fills in
508 values from instance state. 505 most of the values from instance state.
509 506
510 Args: 507 Args:
511 uri_or_str: StorageUri or URI string naming wildcard objects to iterate. 508 url_string: URL string naming wildcard objects to iterate.
509 all_versions: If true, the iterator yields all versions of objects
510 matching the wildcard. If false, yields just the live
511 object version.
512
513 Returns:
514 WildcardIterator for use by caller.
512 """ 515 """
513 return wildcard_iterator.wildcard_iterator( 516 return CreateWildcardIterator(
514 uri_or_str, self.proj_id_handler, 517 url_string, self.gsutil_api, all_versions=all_versions,
515 bucket_storage_uri_class=self.bucket_storage_uri_class, 518 debug=self.debug, project_id=self.project_id)
516 all_versions=all_versions,
517 headers=self.headers, debug=self.debug)
518 519
519 def RunCommand(self): 520 def RunCommand(self):
520 """Abstract function in base class. Subclasses must implement this. The 521 """Abstract function in base class. Subclasses must implement this.
521 return value of this function will be used as the exit status of the 522
523 The return value of this function will be used as the exit status of the
522 process, so subclass commands should return an integer exit code (0 for 524 process, so subclass commands should return an integer exit code (0 for
523 success, a value in [1,255] for failure). 525 success, a value in [1,255] for failure).
524 """ 526 """
525 raise CommandException('Command %s is missing its RunCommand() ' 527 raise CommandException('Command %s is missing its RunCommand() '
526 'implementation' % self.command_name) 528 'implementation' % self.command_name)
527 529
528 ############################################################ 530 ############################################################
529 # Shared helper functions that depend on base class state. # 531 # Shared helper functions that depend on base class state. #
530 ############################################################ 532 ############################################################
531 533
532 def UrisAreForSingleProvider(self, uri_args): 534 def ApplyAclFunc(self, acl_func, acl_excep_handler, url_strs):
533 """Tests whether the uris are all for a single provider. 535 """Sets the standard or default object ACL depending on self.command_name.
534 536
535 Returns: a StorageUri for one of the uris on success, None on failure. 537 Args:
538 acl_func: ACL function to be passed to Apply.
539 acl_excep_handler: ACL exception handler to be passed to Apply.
540 url_strs: URL strings on which to set ACL.
541
542 Raises:
543 CommandException if an ACL could not be set.
536 """ 544 """
537 provider = None 545 multi_threaded_url_args = []
538 uri = None 546 # Handle bucket ACL setting operations single-threaded, because
539 for uri_str in uri_args: 547 # our threading machinery currently assumes it's working with objects
540 # validate=False because we allow wildcard uris. 548 # (name_expansion_iterator), and normally we wouldn't expect users to need
541 uri = boto.storage_uri( 549 # to set ACLs on huge numbers of buckets at once anyway.
542 uri_str, debug=self.debug, validate=False, 550 for url_str in url_strs:
543 bucket_storage_uri_class=self.bucket_storage_uri_class) 551 url = StorageUrlFromString(url_str)
544 if not provider: 552 if url.IsCloudUrl() and url.IsBucket():
545 provider = uri.scheme 553 if self.recursion_requested:
546 elif uri.scheme != provider: 554 # If user specified -R option, convert any bucket args to bucket
547 return None 555 # wildcards (e.g., gs://bucket/*), to prevent the operation from
548 return uri 556 # being applied to the buckets themselves.
549 557 url.object_name = '*'
550 def _SetAclFunc(self, name_expansion_result): 558 multi_threaded_url_args.append(url.url_string)
551 exp_src_uri = self.suri_builder.StorageUri( 559 else:
552 name_expansion_result.GetExpandedUriStr()) 560 # Convert to a NameExpansionResult so we can re-use the threaded
553 # We don't do bucket operations multi-threaded (see comment below). 561 # function for the single-threaded implementation. RefType is unused.
554 assert self.command_name != 'defacl' 562 for blr in self.WildcardIterator(url.url_string).IterBuckets(
555 self.logger.info('Setting ACL on %s...' % 563 bucket_fields=['id']):
556 name_expansion_result.expanded_uri_str) 564 name_expansion_for_url = NameExpansionResult(
557 try: 565 url, False, False, blr.storage_url)
558 if self.canned: 566 acl_func(self, name_expansion_for_url)
559 exp_src_uri.set_acl(self.acl_arg, exp_src_uri.object_name, False,
560 self.headers)
561 else: 567 else:
562 exp_src_uri.set_xml_acl(self.acl_arg, exp_src_uri.object_name, False, 568 multi_threaded_url_args.append(url_str)
563 self.headers) 569
564 except GSResponseError as e: 570 if len(multi_threaded_url_args) >= 1:
565 if self.continue_on_error: 571 name_expansion_iterator = NameExpansionIterator(
566 exc_name, message, detail = util.ParseErrorDetail(e) 572 self.command_name, self.debug,
567 self.everything_set_okay = False 573 self.logger, self.gsutil_api,
568 sys.stderr.write(util.FormatErrorMessage( 574 multi_threaded_url_args, self.recursion_requested,
569 exc_name, e.status, e.code, e.reason, message, detail)) 575 all_versions=self.all_versions,
570 else: 576 continue_on_error=self.continue_on_error or self.parallel_operations)
577
578 # Perform requests in parallel (-m) mode, if requested, using
579 # configured number of parallel processes and threads. Otherwise,
580 # perform requests with sequential function calls in current process.
581 self.Apply(acl_func, name_expansion_iterator, acl_excep_handler,
582 fail_on_error=not self.continue_on_error)
583
584 if not self.everything_set_okay and not self.continue_on_error:
585 raise CommandException('ACLs for some objects could not be set.')
586
587 def SetAclFunc(self, name_expansion_result, thread_state=None):
588 """Sets the object ACL for the name_expansion_result provided.
589
590 Args:
591 name_expansion_result: NameExpansionResult describing the target object.
592 thread_state: If present, use this gsutil Cloud API instance for the set.
593 """
594 if thread_state:
595 assert not self.def_acl
596 gsutil_api = thread_state
597 else:
598 gsutil_api = self.gsutil_api
599 op_string = 'default object ACL' if self.def_acl else 'ACL'
600 url = name_expansion_result.expanded_storage_url
601 self.logger.info('Setting %s on %s...', op_string, url)
602 if ((gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML
603 and url.scheme != 'gs') or self.canned):
604 # If we are using canned ACLs or interacting with a non-google ACL
605 # model, we need to use the XML passthrough. acl_arg should either
606 # be a canned ACL or an XML ACL.
607 try:
608 # No canned ACL support in JSON, force XML API to be used.
609 orig_prefer_api = gsutil_api.prefer_api
610 gsutil_api.prefer_api = ApiSelector.XML
611 gsutil_api.XmlPassThroughSetAcl(
612 self.acl_arg, url, canned=self.canned,
613 def_obj_acl=self.def_acl, provider=url.scheme)
614 gsutil_api.prefer_api = orig_prefer_api
615 except ServiceException as e:
616 if self.continue_on_error:
617 self.everything_set_okay = False
618 self.logger.error(e)
619 else:
620 raise
621 else: # Normal Cloud API path. ACL is a JSON ACL.
622 try:
623 if url.IsBucket():
624 if self.def_acl:
625 def_obj_acl = AclTranslation.JsonToMessage(
626 self.acl_arg, apitools_messages.ObjectAccessControl)
627 bucket_metadata = apitools_messages.Bucket(
628 defaultObjectAcl=def_obj_acl)
629 gsutil_api.PatchBucket(url.bucket_name, bucket_metadata,
630 provider=url.scheme, fields=['id'])
631 else:
632 bucket_acl = AclTranslation.JsonToMessage(
633 self.acl_arg, apitools_messages.BucketAccessControl)
634 bucket_metadata = apitools_messages.Bucket(acl=bucket_acl)
635 gsutil_api.PatchBucket(url.bucket_name, bucket_metadata,
636 provider=url.scheme, fields=['id'])
637 else: # url.IsObject()
638 object_acl = AclTranslation.JsonToMessage(
639 self.acl_arg, apitools_messages.ObjectAccessControl)
640 object_metadata = apitools_messages.Object(acl=object_acl)
641 gsutil_api.PatchObjectMetadata(url.bucket_name, url.object_name,
642 object_metadata, provider=url.scheme,
643 generation=url.generation)
644 except ArgumentException, e:
645 raise
646 except ServiceException, e:
571 raise 647 raise
572 648
573 def SetAclCommandHelper(self): 649 def SetAclCommandHelper(self, acl_func, acl_excep_handler):
650 """Sets ACLs on the self.args using the passed-in acl function.
651
652 Args:
653 acl_func: ACL function to be passed to Apply.
654 acl_excep_handler: ACL exception handler to be passed to Apply.
574 """ 655 """
575 Common logic for setting ACLs. Sets the standard ACL or the default 656 acl_arg = self.args[0]
576 object ACL depending on self.command_name. 657 url_args = self.args[1:]
577 """ 658 # Disallow multi-provider setacl requests, because there are differences in
578
579 self.acl_arg = self.args[0]
580 uri_args = self.args[1:]
581 # Disallow multi-provider acl set requests, because there are differences in
582 # the ACL models. 659 # the ACL models.
583 storage_uri = self.UrisAreForSingleProvider(uri_args) 660 if not UrlsAreForSingleProvider(url_args):
584 if not storage_uri:
585 raise CommandException('"%s" command spanning providers not allowed.' % 661 raise CommandException('"%s" command spanning providers not allowed.' %
586 self.command_name) 662 self.command_name)
587 663
588 # Determine whether acl_arg names a file containing XML ACL text vs. the 664 # Determine whether acl_arg names a file containing XML ACL text vs. the
589 # string name of a canned ACL. 665 # string name of a canned ACL.
590 if os.path.isfile(self.acl_arg): 666 if os.path.isfile(acl_arg):
591 with codecs.open(self.acl_arg, 'r', 'utf-8') as f: 667 with codecs.open(acl_arg, 'r', UTF8) as f:
592 self.acl_arg = f.read() 668 acl_arg = f.read()
593 self.canned = False 669 self.canned = False
594 else: 670 else:
595 # No file exists, so expect a canned ACL string. 671 # No file exists, so expect a canned ACL string.
672 # Canned ACLs are not supported in JSON and we need to use the XML API
673 # to set them.
674 # validate=False because we allow wildcard urls.
675 storage_uri = boto.storage_uri(
676 url_args[0], debug=self.debug, validate=False,
677 bucket_storage_uri_class=self.bucket_storage_uri_class)
678
596 canned_acls = storage_uri.canned_acls() 679 canned_acls = storage_uri.canned_acls()
597 if self.acl_arg not in canned_acls: 680 if acl_arg not in canned_acls:
598 raise CommandException('Invalid canned ACL "%s".' % self.acl_arg) 681 raise CommandException('Invalid canned ACL "%s".' % acl_arg)
599 self.canned = True 682 self.canned = True
600 683
601 # Used to track if any ACLs failed to be set. 684 # Used to track if any ACLs failed to be set.
602 self.everything_set_okay = True 685 self.everything_set_okay = True
686 self.acl_arg = acl_arg
603 687
604 # If user specified -R option, convert any bucket args to bucket wildcards 688 self.ApplyAclFunc(acl_func, acl_excep_handler, url_args)
605 # (e.g., gs://bucket/*), to prevent the operation from being applied to
606 # the buckets themselves.
607 if self.recursion_requested:
608 for i in range(len(uri_args)):
609 uri = self.suri_builder.StorageUri(uri_args[i])
610 if uri.names_bucket():
611 uri_args[i] = uri.clone_replace_name('*').uri
612 else:
613 # Handle bucket ACL setting operations single-threaded, because
614 # our threading machinery currently assumes it's working with objects
615 # (name_expansion_iterator), and normally we wouldn't expect users to need
616 # to set ACLs on huge numbers of buckets at once anyway.
617 for i in range(len(uri_args)):
618 uri_str = uri_args[i]
619 if self.suri_builder.StorageUri(uri_str).names_bucket():
620 self._RunSingleThreadedSetAcl(self.acl_arg, uri_args)
621 return
622
623 name_expansion_iterator = NameExpansionIterator(
624 self.command_name, self.proj_id_handler, self.headers, self.debug,
625 self.logger, self.bucket_storage_uri_class, uri_args,
626 self.recursion_requested, self.recursion_requested,
627 all_versions=self.all_versions)
628 # Perform requests in parallel (-m) mode, if requested, using
629 # configured number of parallel processes and threads. Otherwise,
630 # perform requests with sequential function calls in current process.
631 self.Apply(_SetAclFuncWrapper, name_expansion_iterator,
632 _SetAclExceptionHandler)
633
634 if not self.everything_set_okay and not self.continue_on_error: 689 if not self.everything_set_okay and not self.continue_on_error:
635 raise CommandException('ACLs for some objects could not be set.') 690 raise CommandException('ACLs for some objects could not be set.')
636 691
637 def _RunSingleThreadedSetAcl(self, acl_arg, uri_args): 692 def _WarnServiceAccounts(self):
638 some_matched = False 693 """Warns service account users who have received an AccessDenied error.
639 for uri_str in uri_args:
640 for blr in self.WildcardIterator(uri_str):
641 if blr.HasPrefix():
642 continue
643 some_matched = True
644 uri = blr.GetUri()
645 if self.command_name == 'defacl':
646 self.logger.info('Setting default object ACL on %s...', uri)
647 if self.canned:
648 uri.set_def_acl(acl_arg, uri.object_name, False, self.headers)
649 else:
650 uri.set_def_xml_acl(acl_arg, False, self.headers)
651 else:
652 self.logger.info('Setting ACL on %s...', uri)
653 if self.canned:
654 uri.set_acl(acl_arg, uri.object_name, False, self.headers)
655 else:
656 uri.set_xml_acl(acl_arg, uri.object_name, False, self.headers)
657 if not some_matched:
658 raise CommandException('No URIs matched')
659 694
660 def _WarnServiceAccounts(self): 695 When one of the metadata-related commands fails due to AccessDenied, user
661 """Warns service account users who have received an AccessDenied error for 696 must ensure that they are listed as an Owner in the API console.
662 one of the metadata-related commands to make sure that they are listed as 697 """
663 Owners in the API console.""" 698 # Import this here so that the value will be set first in
664 699 # gcs_oauth2_boto_plugin.
665 # Import this here so that the value will be set first in oauth2_plugin. 700 # pylint: disable=g-import-not-at-top
666 from gslib.third_party.oauth2_plugin.oauth2_plugin import IS_SERVICE_ACCOUNT 701 from gcs_oauth2_boto_plugin.oauth2_plugin import IS_SERVICE_ACCOUNT
667 702
668 if IS_SERVICE_ACCOUNT: 703 if IS_SERVICE_ACCOUNT:
669 # This method is only called when canned ACLs are used, so the warning 704 # This method is only called when canned ACLs are used, so the warning
670 # definitely applies. 705 # definitely applies.
671 self.logger.warning('\n'.join(textwrap.wrap( 706 self.logger.warning('\n'.join(textwrap.wrap(
672 'It appears that your service account has been denied access while ' 707 'It appears that your service account has been denied access while '
673 'attempting to perform a metadata operation. If you believe that you ' 708 'attempting to perform a metadata operation. If you believe that you '
674 'should have access to this metadata (i.e., if it is associated with ' 709 'should have access to this metadata (i.e., if it is associated with '
675 'your account), please make sure that your service account''s email ' 710 'your account), please make sure that your service account''s email '
676 'address is listed as an Owner in the Team tab of the API console. ' 711 'address is listed as an Owner in the Team tab of the API console. '
677 'See "gsutil help creds" for further information.\n'))) 712 'See "gsutil help creds" for further information.\n')))
678 713
679 def GetAclCommandHelper(self): 714 def GetAndPrintAcl(self, url_str):
680 """Common logic for getting ACLs. Gets the standard ACL or the default 715 """Prints the standard or default object ACL depending on self.command_name.
681 object ACL depending on self.command_name."""
682
683 # Resolve to just one object.
684 # Handle wildcard-less URI specially in case this is a version-specific
685 # URI, because WildcardIterator().IterUris() would lose the versioning info.
686 if not ContainsWildcard(self.args[0]):
687 uri = self.suri_builder.StorageUri(self.args[0])
688 else:
689 uris = list(self.WildcardIterator(self.args[0]).IterUris())
690 if len(uris) == 0:
691 raise CommandException('No URIs matched')
692 if len(uris) != 1:
693 raise CommandException('%s matched more than one URI, which is not '
694 'allowed by the %s command' % (self.args[0], self.command_name))
695 uri = uris[0]
696 if not uri.names_bucket() and not uri.names_object():
697 raise CommandException('"%s" command must specify a bucket or '
698 'object.' % self.command_name)
699 if self.command_name == 'defacl':
700 acl = uri.get_def_acl(False, self.headers)
701 else:
702 acl = uri.get_acl(False, self.headers)
703 # Pretty-print the XML to make it more easily human editable.
704 parsed_xml = xml.dom.minidom.parseString(acl.to_xml().encode('utf-8'))
705 print parsed_xml.toprettyxml(indent=' ').encode('utf-8')
706
707 def GetXmlSubresource(self, subresource, uri_arg):
708 """Print an xml subresource, e.g. logging, for a bucket/object.
709 716
710 Args: 717 Args:
711 subresource: The subresource name. 718 url_str: URL string to get ACL for.
712 uri_arg: URI for the bucket/object. Wildcards will be expanded. 719 """
720 blr = self.GetAclCommandBucketListingReference(url_str)
721 url = StorageUrlFromString(url_str)
722 if (self.gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML
723 and url.scheme != 'gs'):
724 # Need to use XML passthrough.
725 try:
726 acl = self.gsutil_api.XmlPassThroughGetAcl(
727 url, def_obj_acl=self.def_acl, provider=url.scheme)
728 print acl.to_xml()
729 except AccessDeniedException, _:
730 self._WarnServiceAccounts()
731 raise
732 else:
733 if self.command_name == 'defacl':
734 acl = blr.root_object.defaultObjectAcl
735 if not acl:
736 self.logger.warn(
737 'No default object ACL present for %s. This could occur if '
738 'the default object ACL is private, in which case objects '
739 'created in this bucket will be readable only by their '
740 'creators. It could also mean you do not have OWNER permission '
741 'on %s and therefore do not have permission to read the '
742 'default object ACL.', url_str, url_str)
743 else:
744 acl = blr.root_object.acl
745 if not acl:
746 self._WarnServiceAccounts()
747 raise AccessDeniedException('Access denied. Please ensure you have '
748 'OWNER permission on %s.' % url_str)
749 print AclTranslation.JsonFromMessage(acl)
750
751 def GetAclCommandBucketListingReference(self, url_str):
752 """Gets a single bucket listing reference for an acl get command.
753
754 Args:
755 url_str: URL string to get the bucket listing reference for.
756
757 Returns:
758 BucketListingReference for the URL string.
713 759
714 Raises: 760 Raises:
715 CommandException: if errors encountered. 761 CommandException if string did not result in exactly one reference.
716 """ 762 """
717 # Wildcarding is allowed but must resolve to just one bucket. 763 # We're guaranteed by caller that we have the appropriate type of url
718 uris = list(self.WildcardIterator(uri_arg).IterUris()) 764 # string for the call (ex. we will never be called with an object string
719 if len(uris) != 1: 765 # by getdefacl)
720 raise CommandException('Wildcards must resolve to exactly one item for ' 766 wildcard_url = StorageUrlFromString(url_str)
721 'get %s' % subresource) 767 if wildcard_url.IsObject():
722 uri = uris[0] 768 plurality_iter = PluralityCheckableIterator(
723 xml_str = uri.get_subresource(subresource, False, self.headers) 769 self.WildcardIterator(url_str).IterObjects(
724 # Pretty-print the XML to make it more easily human editable. 770 bucket_listing_fields=['acl']))
725 parsed_xml = xml.dom.minidom.parseString(xml_str.encode('utf-8')) 771 else:
726 print parsed_xml.toprettyxml(indent=' ') 772 # Bucket or provider. We call IterBuckets explicitly here to ensure that
773 # the root object is populated with the acl.
774 if self.command_name == 'defacl':
775 bucket_fields = ['defaultObjectAcl']
776 else:
777 bucket_fields = ['acl']
778 plurality_iter = PluralityCheckableIterator(
779 self.WildcardIterator(url_str).IterBuckets(
780 bucket_fields=bucket_fields))
781 if plurality_iter.IsEmpty():
782 raise CommandException('No URLs matched')
783 if plurality_iter.HasPlurality():
784 raise CommandException(
785 '%s matched more than one URL, which is not allowed by the %s '
786 'command' % (url_str, self.command_name))
787 return list(plurality_iter)[0]
727 788
728 def _HandleMultiProcessingControlC(self, signal_num, cur_stack_frame): 789 def _HandleMultiProcessingControlC(self, unused_signal_num,
729 """Called when user hits ^C during a multi-processing/multi-threaded 790 unused_cur_stack_frame):
730 request, so we can kill the subprocesses.""" 791 """Called when user hits ^C during a multi-process/multi-thread request.
792
793 Kills subprocesses.
794
795 Args:
796 unused_signal_num: signal generated by ^C.
797 unused_cur_stack_frame: Current stack frame.
798 """
731 # Note: This only works under Linux/MacOS. See 799 # Note: This only works under Linux/MacOS. See
732 # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details 800 # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details
733 # about why making it work correctly across OS's is harder and still open. 801 # about why making it work correctly across OS's is harder and still open.
734 ShutDownGsutil() 802 ShutDownGsutil()
735 sys.stderr.write('Caught ^C - exiting\n') 803 sys.stderr.write('Caught ^C - exiting\n')
736 # Simply calling sys.exit(1) doesn't work - see above bug for details. 804 # Simply calling sys.exit(1) doesn't work - see above bug for details.
737 KillProcess(os.getpid()) 805 KillProcess(os.getpid())
738 806
739 def HaveFileUris(self, args_to_check): 807 def GetSingleBucketUrlFromArg(self, arg, bucket_fields=None):
740 """Checks whether args_to_check contain any file URIs. 808 """Gets a single bucket URL based on the command arguments.
741 809
742 Args: 810 Args:
743 args_to_check: Command-line argument subset to check. 811 arg: String argument to get bucket URL for.
812 bucket_fields: Fields to populate for the bucket.
744 813
745 Returns: 814 Returns:
746 True if args_to_check contains any file URIs. 815 (StorageUrl referring to a single bucket, Bucket metadata).
816
817 Raises:
818 CommandException if args did not match exactly one bucket.
747 """ 819 """
748 for uri_str in args_to_check: 820 plurality_checkable_iterator = self.GetBucketUrlIterFromArg(
749 if uri_str.lower().startswith('file://') or uri_str.find(':') == -1: 821 arg, bucket_fields=bucket_fields)
750 return True 822 if plurality_checkable_iterator.HasPlurality():
751 return False 823 raise CommandException(
824 '%s matched more than one URL, which is not\n'
825 'allowed by the %s command' % (arg, self.command_name))
826 blr = list(plurality_checkable_iterator)[0]
827 return StorageUrlFromString(blr.url_string), blr.root_object
828
829 def GetBucketUrlIterFromArg(self, arg, bucket_fields=None):
830 """Gets a single bucket URL based on the command arguments.
831
832 Args:
833 arg: String argument to iterate over.
834 bucket_fields: Fields to populate for the bucket.
835
836 Returns:
837 PluralityCheckableIterator over buckets.
838
839 Raises:
840 CommandException if iterator matched no buckets.
841 """
842 arg_url = StorageUrlFromString(arg)
843 if not arg_url.IsCloudUrl() or arg_url.IsObject():
844 raise CommandException('"%s" command must specify a bucket' %
845 self.command_name)
846
847 plurality_checkable_iterator = PluralityCheckableIterator(
848 self.WildcardIterator(arg).IterBuckets(
849 bucket_fields=bucket_fields))
850 if plurality_checkable_iterator.IsEmpty():
851 raise CommandException('No URLs matched')
852 return plurality_checkable_iterator
752 853
753 ###################### 854 ######################
754 # Private functions. # 855 # Private functions. #
755 ###################### 856 ######################
756 857
757 def _HaveProviderUris(self, args_to_check):
758 """Checks whether args_to_check contains any provider URIs (like 'gs://').
759
760 Args:
761 args_to_check: Command-line argument subset to check.
762
763 Returns:
764 True if args_to_check contains any provider URIs.
765 """
766 for uri_str in args_to_check:
767 if re.match('^[a-z]+://$', uri_str):
768 return True
769 return False
770
771 def _ResetConnectionPool(self): 858 def _ResetConnectionPool(self):
772 # Each OS process needs to establish its own set of connections to 859 # Each OS process needs to establish its own set of connections to
773 # the server to avoid writes from different OS processes interleaving 860 # the server to avoid writes from different OS processes interleaving
774 # onto the same socket (and garbling the underlying SSL session). 861 # onto the same socket (and garbling the underlying SSL session).
775 # We ensure each process gets its own set of connections here by 862 # We ensure each process gets its own set of connections here by
776 # closing all connections in the storage provider connection pool. 863 # closing all connections in the storage provider connection pool.
777 connection_pool = StorageUri.provider_pool 864 connection_pool = StorageUri.provider_pool
778 if connection_pool: 865 if connection_pool:
779 for i in connection_pool: 866 for i in connection_pool:
780 connection_pool[i].connection.close() 867 connection_pool[i].connection.close()
781 868
782 def _GetProcessAndThreadCount(self, process_count, thread_count, 869 def _GetProcessAndThreadCount(self, process_count, thread_count,
783 parallel_operations_override): 870 parallel_operations_override):
784 """ 871 """Determines the values of process_count and thread_count.
785 Determines the values of process_count and thread_count that we should 872
786 actually use. If we're not performing operations in parallel, then ignore 873 These values are used for parallel operations.
874 If we're not performing operations in parallel, then ignore
787 existing values and use process_count = thread_count = 1. 875 existing values and use process_count = thread_count = 1.
788 876
789 Args: 877 Args:
790 process_count: A positive integer or None. In the latter case, we read 878 process_count: A positive integer or None. In the latter case, we read
791 the value from the .boto config file. 879 the value from the .boto config file.
792 thread_count: A positive integer or None. In the latter case, we read 880 thread_count: A positive integer or None. In the latter case, we read
793 the value from the .boto config file. 881 the value from the .boto config file.
794 parallel_operations_override: Used to override self.parallel_operations. 882 parallel_operations_override: Used to override self.parallel_operations.
795 This allows the caller to safely override 883 This allows the caller to safely override
796 the top-level flag for a single call. 884 the top-level flag for a single call.
(...skipping 18 matching lines...) Expand all
815 gslib.commands.config.DEFAULT_PARALLEL_THREAD_COUNT) 903 gslib.commands.config.DEFAULT_PARALLEL_THREAD_COUNT)
816 if thread_count < 1: 904 if thread_count < 1:
817 raise CommandException('Invalid parallel_thread_count "%d".' % 905 raise CommandException('Invalid parallel_thread_count "%d".' %
818 thread_count) 906 thread_count)
819 else: 907 else:
820 # If -m not specified, then assume 1 OS process and 1 Python thread. 908 # If -m not specified, then assume 1 OS process and 1 Python thread.
821 process_count = 1 909 process_count = 1
822 thread_count = 1 910 thread_count = 1
823 911
824 if IS_WINDOWS and process_count > 1: 912 if IS_WINDOWS and process_count > 1:
825 raise CommandException('\n'.join(textwrap.wrap(( 913 raise CommandException('\n'.join(textwrap.wrap(
826 'It is not possible to set process_count > 1 on Windows. Please ' 914 ('It is not possible to set process_count > 1 on Windows. Please '
827 'update your config file (located at %s) and set ' 915 'update your config file (located at %s) and set '
828 '"process_count = 1".') % 916 '"parallel_process_count = 1".') %
829 GetConfigFilePath()))) 917 GetConfigFilePath())))
830 self.logger.debug('process count: %d', process_count) 918 self.logger.debug('process count: %d', process_count)
831 self.logger.debug('thread count: %d', thread_count) 919 self.logger.debug('thread count: %d', thread_count)
832 920
833 return (process_count, thread_count) 921 return (process_count, thread_count)
834 922
835 def _SetUpPerCallerState(self): 923 def _SetUpPerCallerState(self):
836 """Set up the state for a caller id, corresponding to one Apply call.""" 924 """Set up the state for a caller id, corresponding to one Apply call."""
837 # Get a new caller ID. 925 # Get a new caller ID.
838 with caller_id_lock: 926 with caller_id_lock:
839 caller_id_counter.value = caller_id_counter.value + 1 927 caller_id_counter.value += 1
840 caller_id = caller_id_counter.value 928 caller_id = caller_id_counter.value
841 929
842 # Create a copy of self with an incremented recursive level. This allows 930 # Create a copy of self with an incremented recursive level. This allows
843 # the class to report its level correctly if the function called from it 931 # the class to report its level correctly if the function called from it
844 # also needs to call Apply. 932 # also needs to call Apply.
845 cls = copy.copy(self) 933 cls = copy.copy(self)
846 cls.recursive_apply_level += 1 934 cls.recursive_apply_level += 1
847 935
848 # Thread-safe loggers can't be pickled, so we will remove it here and 936 # Thread-safe loggers can't be pickled, so we will remove it here and
849 # recreate it later in the WorkerThread. This is not a problem since any 937 # recreate it later in the WorkerThread. This is not a problem since any
850 # logger with the same name will be treated as a singleton. 938 # logger with the same name will be treated as a singleton.
851 cls.logger = None 939 cls.logger = None
940
941 # Likewise, the default API connection can't be pickled, but it is unused
942 # anyway as each thread gets its own API delegator.
943 cls.gsutil_api = None
944
852 class_map[caller_id] = cls 945 class_map[caller_id] = cls
853
854 total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet. 946 total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet.
855 call_completed_map[caller_id] = False 947 call_completed_map[caller_id] = False
856 caller_id_finished_count.put(caller_id, 0) 948 caller_id_finished_count.Put(caller_id, 0)
857 global_return_values_map.put(caller_id, []) 949 global_return_values_map.Put(caller_id, [])
858 return caller_id 950 return caller_id
859 951
860 def _CreateNewConsumerPool(self, num_processes, num_threads): 952 def _CreateNewConsumerPool(self, num_processes, num_threads):
861 """Create a new pool of processes that call _ApplyThreads.""" 953 """Create a new pool of processes that call _ApplyThreads."""
862 processes = [] 954 processes = []
863 task_queue = _NewMultiprocessingQueue() 955 task_queue = _NewMultiprocessingQueue()
864 task_queues.append(task_queue) 956 task_queues.append(task_queue)
865 957
866 current_max_recursive_level.value = current_max_recursive_level.value + 1 958 current_max_recursive_level.value += 1
867 if current_max_recursive_level.value > MAX_RECURSIVE_DEPTH: 959 if current_max_recursive_level.value > MAX_RECURSIVE_DEPTH:
868 raise CommandException('Recursion depth of Apply calls is too great.') 960 raise CommandException('Recursion depth of Apply calls is too great.')
869 for shard in range(num_processes): 961 for _ in range(num_processes):
870 recursive_apply_level = len(consumer_pools) 962 recursive_apply_level = len(consumer_pools)
871 p = multiprocessing.Process( 963 p = multiprocessing.Process(
872 target=self._ApplyThreads, 964 target=self._ApplyThreads,
873 args=(num_threads, recursive_apply_level, shard)) 965 args=(num_threads, num_processes, recursive_apply_level))
874 p.daemon = True 966 p.daemon = True
875 processes.append(p) 967 processes.append(p)
876 p.start() 968 p.start()
877 consumer_pool = _ConsumerPool(processes, task_queue) 969 consumer_pool = _ConsumerPool(processes, task_queue)
878 consumer_pools.append(consumer_pool) 970 consumer_pools.append(consumer_pool)
879 971
880 def Apply(self, func, args_iterator, exception_handler, 972 def Apply(self, func, args_iterator, exception_handler,
881 shared_attrs=None, arg_checker=_UriArgChecker, 973 shared_attrs=None, arg_checker=_UrlArgChecker,
882 parallel_operations_override=False, process_count=None, 974 parallel_operations_override=False, process_count=None,
883 thread_count=None, should_return_results=False, 975 thread_count=None, should_return_results=False,
884 fail_on_error=False): 976 fail_on_error=False):
885 """ 977 """Calls _Parallel/SequentialApply based on multiprocessing availability.
886 Determines whether the necessary parts of the multiprocessing module are
887 available, and delegates to _ParallelApply or _SequentialApply as
888 appropriate.
889 978
890 Args: 979 Args:
891 func: Function to call to process each argument. 980 func: Function to call to process each argument.
892 args_iterator: Iterable collection of arguments to be put into the 981 args_iterator: Iterable collection of arguments to be put into the
893 work queue. 982 work queue.
894 exception_handler: Exception handler for WorkerThread class. 983 exception_handler: Exception handler for WorkerThread class.
895 shared_attrs: List of attributes to manage across sub-processes. 984 shared_attrs: List of attributes to manage across sub-processes.
896 arg_checker: Used to determine whether we should process the current 985 arg_checker: Used to determine whether we should process the current
897 argument or simply skip it. Also handles any logging that 986 argument or simply skip it. Also handles any logging that
898 is specific to a particular type of argument. 987 is specific to a particular type of argument.
899 parallel_operations_override: Used to override self.parallel_operations. 988 parallel_operations_override: Used to override self.parallel_operations.
900 This allows the caller to safely override 989 This allows the caller to safely override
901 the top-level flag for a single call. 990 the top-level flag for a single call.
902 process_count: The number of processes to use. If not specified, then 991 process_count: The number of processes to use. If not specified, then
903 the configured default will be used. 992 the configured default will be used.
904 thread_count: The number of threads per process. If not speficied, then 993 thread_count: The number of threads per process. If not speficied, then
905 the configured default will be used.. 994 the configured default will be used..
906 should_return_results: If true, then return the results of all successful 995 should_return_results: If true, then return the results of all successful
907 calls to func in a list. 996 calls to func in a list.
908 fail_on_error: If true, then raise any exceptions encountered when 997 fail_on_error: If true, then raise any exceptions encountered when
909 executing func. This is only applicable in the case of 998 executing func. This is only applicable in the case of
910 process_count == thread_count == 1. 999 process_count == thread_count == 1.
1000
1001 Returns:
1002 Results from spawned threads.
911 """ 1003 """
912 if shared_attrs: 1004 if shared_attrs:
913 original_shared_vars_values = {} # We'll add these back in at the end. 1005 original_shared_vars_values = {} # We'll add these back in at the end.
914 for name in shared_attrs: 1006 for name in shared_attrs:
915 original_shared_vars_values[name] = getattr(self, name) 1007 original_shared_vars_values[name] = getattr(self, name)
916 # By setting this to 0, we simplify the logic for computing deltas. 1008 # By setting this to 0, we simplify the logic for computing deltas.
917 # We'll add it back after all of the tasks have been performed. 1009 # We'll add it back after all of the tasks have been performed.
918 setattr(self, name, 0) 1010 setattr(self, name, 0)
919 1011
920 (process_count, thread_count) = self._GetProcessAndThreadCount( 1012 (process_count, thread_count) = self._GetProcessAndThreadCount(
921 process_count, thread_count, parallel_operations_override) 1013 process_count, thread_count, parallel_operations_override)
1014
922 is_main_thread = (self.recursive_apply_level == 0 1015 is_main_thread = (self.recursive_apply_level == 0
923 and self.sequential_caller_id == -1) 1016 and self.sequential_caller_id == -1)
924 1017
925 # We don't honor the fail_on_error flag in the case of multiple threads 1018 # We don't honor the fail_on_error flag in the case of multiple threads
926 # or processes. 1019 # or processes.
927 fail_on_error = fail_on_error and (process_count * thread_count == 1) 1020 fail_on_error = fail_on_error and (process_count * thread_count == 1)
928 1021
929 # Only check this from the first call in the main thread. Apart from the 1022 # Only check this from the first call in the main thread. Apart from the
930 # fact that it's wasteful to try this multiple times in general, it also 1023 # fact that it's wasteful to try this multiple times in general, it also
931 # will never work when called from a subprocess since we use daemon 1024 # will never work when called from a subprocess since we use daemon
932 # processes, and daemons can't create other processes. 1025 # processes, and daemons can't create other processes.
933 if is_main_thread: 1026 if is_main_thread:
934 if ((not self.multiprocessing_is_available) 1027 if ((not self.multiprocessing_is_available)
935 and thread_count * process_count > 1): 1028 and thread_count * process_count > 1):
936 # Run the check again and log the appropriate warnings. This was run 1029 # Run the check again and log the appropriate warnings. This was run
937 # before, when the Command object was created, in order to calculate 1030 # before, when the Command object was created, in order to calculate
938 # self.multiprocessing_is_available, but we don't want to print the 1031 # self.multiprocessing_is_available, but we don't want to print the
939 # warning until we're sure the user actually tried to use multiple 1032 # warning until we're sure the user actually tried to use multiple
940 # threads or processes. 1033 # threads or processes.
941 MultiprocessingIsAvailable(logger=self.logger) 1034 MultiprocessingIsAvailable(logger=self.logger)
942 1035
943 if self.multiprocessing_is_available: 1036 if self.multiprocessing_is_available:
944 caller_id = self._SetUpPerCallerState() 1037 caller_id = self._SetUpPerCallerState()
945 else: 1038 else:
946 self.sequential_caller_id += 1 1039 self.sequential_caller_id += 1
947 caller_id = self.sequential_caller_id 1040 caller_id = self.sequential_caller_id
948 1041
949 if is_main_thread: 1042 if is_main_thread:
950 global global_return_values_map, shared_vars_map 1043 # pylint: disable=global-variable-undefined
1044 global global_return_values_map, shared_vars_map, failure_count
951 global caller_id_finished_count, shared_vars_list_map 1045 global caller_id_finished_count, shared_vars_list_map
952 global_return_values_map = BasicIncrementDict() 1046 global_return_values_map = BasicIncrementDict()
953 global_return_values_map.put(caller_id, []) 1047 global_return_values_map.Put(caller_id, [])
954 shared_vars_map = BasicIncrementDict() 1048 shared_vars_map = BasicIncrementDict()
955 caller_id_finished_count = BasicIncrementDict() 1049 caller_id_finished_count = BasicIncrementDict()
956 shared_vars_list_map = {} 1050 shared_vars_list_map = {}
957 1051 failure_count = 0
958 1052
959 # If any shared attributes passed by caller, create a dictionary of 1053 # If any shared attributes passed by caller, create a dictionary of
960 # shared memory variables for every element in the list of shared 1054 # shared memory variables for every element in the list of shared
961 # attributes. 1055 # attributes.
962 if shared_attrs: 1056 if shared_attrs:
963 shared_vars_list_map[caller_id] = shared_attrs 1057 shared_vars_list_map[caller_id] = shared_attrs
964 for name in shared_attrs: 1058 for name in shared_attrs:
965 shared_vars_map.put((caller_id, name), 0) 1059 shared_vars_map.Put((caller_id, name), 0)
966 1060
967 # Make all of the requested function calls. 1061 # Make all of the requested function calls.
968 if self.multiprocessing_is_available and thread_count * process_count > 1: 1062 if self.multiprocessing_is_available and thread_count * process_count > 1:
969 self._ParallelApply(func, args_iterator, exception_handler, caller_id, 1063 self._ParallelApply(func, args_iterator, exception_handler, caller_id,
970 arg_checker, parallel_operations_override, 1064 arg_checker, process_count, thread_count,
971 process_count, thread_count, should_return_results, 1065 should_return_results, fail_on_error)
972 fail_on_error)
973 else: 1066 else:
974 self._SequentialApply(func, args_iterator, exception_handler, caller_id, 1067 self._SequentialApply(func, args_iterator, exception_handler, caller_id,
975 arg_checker, should_return_results, fail_on_error) 1068 arg_checker, should_return_results, fail_on_error)
976 1069
977 if shared_attrs: 1070 if shared_attrs:
978 for name in shared_attrs: 1071 for name in shared_attrs:
979 # This allows us to retain the original value of the shared variable, 1072 # This allows us to retain the original value of the shared variable,
980 # and simply apply the delta after what was done during the call to 1073 # and simply apply the delta after what was done during the call to
981 # apply. 1074 # apply.
982 final_value = (original_shared_vars_values[name] + 1075 final_value = (original_shared_vars_values[name] +
983 shared_vars_map.get((caller_id, name))) 1076 shared_vars_map.Get((caller_id, name)))
984 setattr(self, name, final_value) 1077 setattr(self, name, final_value)
985 1078
986 if should_return_results: 1079 if should_return_results:
987 return global_return_values_map.get(caller_id) 1080 return global_return_values_map.Get(caller_id)
988 1081
1082 def _MaybeSuggestGsutilDashM(self):
1083 """Outputs a sugestion to the user to use gsutil -m."""
1084 if not (boto.config.getint('GSUtil', 'parallel_process_count', 0) == 1 and
1085 boto.config.getint('GSUtil', 'parallel_thread_count', 0) == 1):
1086 self.logger.warn('\n' + textwrap.fill(
1087 '==> NOTE: You are performing a sequence of gsutil operations that '
1088 'may run significantly faster if you instead use gsutil -m %s ...\n'
1089 'Please see the -m section under "gsutil help options" for further '
1090 'information about when gsutil -m can be advantageous.'
1091 % sys.argv[1]) + '\n')
1092
1093 # pylint: disable=g-doc-args
989 def _SequentialApply(self, func, args_iterator, exception_handler, caller_id, 1094 def _SequentialApply(self, func, args_iterator, exception_handler, caller_id,
990 arg_checker, should_return_results, fail_on_error): 1095 arg_checker, should_return_results, fail_on_error):
1096 """Performs all function calls sequentially in the current thread.
1097
1098 No other threads or processes will be spawned. This degraded functionality
1099 is used when the multiprocessing module is not available or the user
1100 requests only one thread and one process.
991 """ 1101 """
992 Perform all function calls sequentially in the current thread. No other
993 threads or processes will be spawned. This degraded functionality is only
994 for use when the multiprocessing module is not available for some reason.
995 """
996
997 # Create a WorkerThread to handle all of the logic needed to actually call 1102 # Create a WorkerThread to handle all of the logic needed to actually call
998 # the function. Note that this thread will never be started, and all work 1103 # the function. Note that this thread will never be started, and all work
999 # is done in the current thread. 1104 # is done in the current thread.
1000 worker_thread = WorkerThread(None, False) 1105 worker_thread = WorkerThread(None, False)
1001 args_iterator = iter(args_iterator) 1106 args_iterator = iter(args_iterator)
1107 # Count of sequential calls that have been made. Used for producing
1108 # suggestion to use gsutil -m.
1109 sequential_call_count = 0
1002 while True: 1110 while True:
1003 1111
1004 # Try to get the next argument, handling any exceptions that arise. 1112 # Try to get the next argument, handling any exceptions that arise.
1005 try: 1113 try:
1006 args = args_iterator.next() 1114 args = args_iterator.next()
1007 except StopIteration, e: 1115 except StopIteration, e:
1008 break 1116 break
1009 except Exception, e: 1117 except Exception, e: # pylint: disable=broad-except
1118 _IncrementFailureCount()
1010 if fail_on_error: 1119 if fail_on_error:
1011 raise 1120 raise
1012 else: 1121 else:
1013 try: 1122 try:
1014 exception_handler(self, e) 1123 exception_handler(self, e)
1015 except Exception, e1: 1124 except Exception, _: # pylint: disable=broad-except
1016 self.logger.debug( 1125 self.logger.debug(
1017 'Caught exception while handling exception for %s:\n%s', 1126 'Caught exception while handling exception for %s:\n%s',
1018 func, traceback.format_exc()) 1127 func, traceback.format_exc())
1019 continue 1128 continue
1129
1130 sequential_call_count += 1
1131 if sequential_call_count == OFFER_GSUTIL_M_SUGGESTION_THRESHOLD:
1132 # Output suggestion near beginning of run, so user sees it early and can
1133 # ^C and try gsutil -m.
1134 self._MaybeSuggestGsutilDashM()
1020 if arg_checker(self, args): 1135 if arg_checker(self, args):
1021 # Now that we actually have the next argument, perform the task. 1136 # Now that we actually have the next argument, perform the task.
1022 task = Task(func, args, caller_id, exception_handler, 1137 task = Task(func, args, caller_id, exception_handler,
1023 should_return_results, arg_checker, fail_on_error) 1138 should_return_results, arg_checker, fail_on_error)
1024 worker_thread.PerformTask(task, self) 1139 worker_thread.PerformTask(task, self)
1140 if sequential_call_count >= gslib.util.GetTermLines():
1141 # Output suggestion at end of long run, in case user missed it at the
1142 # start and it scrolled off-screen.
1143 self._MaybeSuggestGsutilDashM()
1025 1144
1145 # pylint: disable=g-doc-args
1026 def _ParallelApply(self, func, args_iterator, exception_handler, caller_id, 1146 def _ParallelApply(self, func, args_iterator, exception_handler, caller_id,
1027 arg_checker, parallel_operations_override, process_count, 1147 arg_checker, process_count, thread_count,
1028 thread_count, should_return_results, fail_on_error): 1148 should_return_results, fail_on_error):
1029 """ 1149 """Dispatches input arguments across a thread/process pool.
1030 Dispatch input arguments across a pool of parallel OS 1150
1031 processes and/or Python threads, based on options (-m or not) 1151 Pools are composed of parallel OS processes and/or Python threads,
1032 and settings in the user's config file. If non-parallel mode 1152 based on options (-m or not) and settings in the user's config file.
1033 or only one OS process requested, execute requests sequentially 1153
1034 in the current OS process. 1154 If only one OS process is requested/available, dispatch requests across
1035 1155 threads in the current OS process.
1156
1036 In the multi-process case, we will create one pool of worker processes for 1157 In the multi-process case, we will create one pool of worker processes for
1037 each level of the tree of recursive calls to Apply. E.g., if A calls 1158 each level of the tree of recursive calls to Apply. E.g., if A calls
1038 Apply(B), and B ultimately calls Apply(C) followed by Apply(D), then we 1159 Apply(B), and B ultimately calls Apply(C) followed by Apply(D), then we
1039 will only create two sets of worker processes - B will execute in the first, 1160 will only create two sets of worker processes - B will execute in the first,
1040 and C and D will execute in the second. If C is then changed to call 1161 and C and D will execute in the second. If C is then changed to call
1041 Apply(E) and D is changed to call Apply(F), then we will automatically 1162 Apply(E) and D is changed to call Apply(F), then we will automatically
1042 create a third set of processes (lazily, when needed) that will be used to 1163 create a third set of processes (lazily, when needed) that will be used to
1043 execute calls to E and F. This might look something like: 1164 execute calls to E and F. This might look something like:
1044 1165
1045 Pool1 Executes: B 1166 Pool1 Executes: B
1046 / \ 1167 / \
1047 Pool2 Executes: C D 1168 Pool2 Executes: C D
1048 / \ 1169 / \
1049 Pool3 Executes: E F 1170 Pool3 Executes: E F
1050 1171
1051 Apply's parallelism is generally broken up into 4 cases: 1172 Apply's parallelism is generally broken up into 4 cases:
1052 - If process_count == thread_count == 1, then all tasks will be executed 1173 - If process_count == thread_count == 1, then all tasks will be executed
1053 in this thread. 1174 by _SequentialApply.
1054 - If process_count > 1 and thread_count == 1, then the main thread will 1175 - If process_count > 1 and thread_count == 1, then the main thread will
1055 create a new pool of processes (if they don't already exist) and each of 1176 create a new pool of processes (if they don't already exist) and each of
1056 those processes will execute the tasks in a single thread. 1177 those processes will execute the tasks in a single thread.
1057 - If process_count == 1 and thread_count > 1, then this process will create 1178 - If process_count == 1 and thread_count > 1, then this process will create
1058 a new pool of threads to execute the tasks. 1179 a new pool of threads to execute the tasks.
1059 - If process_count > 1 and thread_count > 1, then the main thread will 1180 - If process_count > 1 and thread_count > 1, then the main thread will
1060 create a new pool of processes (if they don't already exist) and each of 1181 create a new pool of processes (if they don't already exist) and each of
1061 those processes will, upon creation, create a pool of threads to 1182 those processes will, upon creation, create a pool of threads to
1062 execute the tasks. 1183 execute the tasks.
1063 1184
1064 Args: 1185 Args:
1065 caller_id: The caller ID unique to this call to command.Apply. 1186 caller_id: The caller ID unique to this call to command.Apply.
1066 See command.Apply for description of other arguments. 1187 See command.Apply for description of other arguments.
1067 """ 1188 """
1068 is_main_thread = self.recursive_apply_level == 0 1189 is_main_thread = self.recursive_apply_level == 0
1069 1190
1070 # Catch ^C under Linux/MacOs so we can do cleanup before exiting. 1191 # Catch ^C under Linux/MacOs so we can do cleanup before exiting.
1071 if not IS_WINDOWS and is_main_thread: 1192 if not IS_WINDOWS and is_main_thread:
1072 signal.signal(signal.SIGINT, self._HandleMultiProcessingControlC) 1193 signal.signal(signal.SIGINT, self._HandleMultiProcessingControlC)
1073 1194
1074 if not len(task_queues): 1195 if not task_queues:
1075 # The process we create will need to access the next recursive level 1196 # The process we create will need to access the next recursive level
1076 # of task queues if it makes a call to Apply, so we always keep around 1197 # of task queues if it makes a call to Apply, so we always keep around
1077 # one more queue than we know we need. OTOH, if we don't create a new 1198 # one more queue than we know we need. OTOH, if we don't create a new
1078 # process, the existing process still needs a task queue to use. 1199 # process, the existing process still needs a task queue to use.
1079 task_queues.append(_NewMultiprocessingQueue()) 1200 task_queues.append(_NewMultiprocessingQueue())
1080 1201
1081 if process_count > 1: # Handle process pool creation. 1202 if process_count > 1: # Handle process pool creation.
1082 # Check whether this call will need a new set of workers. 1203 # Check whether this call will need a new set of workers.
1083 with need_pool_or_done_cond: 1204
1205 # Each worker must acquire a shared lock before notifying the main thread
1206 # that it needs a new worker pool, so that at most one worker asks for
1207 # a new worker pool at once.
1208 try:
1209 if not is_main_thread:
1210 worker_checking_level_lock.acquire()
1084 if self.recursive_apply_level >= current_max_recursive_level.value: 1211 if self.recursive_apply_level >= current_max_recursive_level.value:
1085 # Only the main thread is allowed to create new processes - otherwise, 1212 with need_pool_or_done_cond:
1086 # we will run into some Python bugs. 1213 # Only the main thread is allowed to create new processes -
1087 if is_main_thread: 1214 # otherwise, we will run into some Python bugs.
1088 self._CreateNewConsumerPool(process_count, thread_count) 1215 if is_main_thread:
1089 else: 1216 self._CreateNewConsumerPool(process_count, thread_count)
1090 # Notify the main thread that we need a new consumer pool. 1217 else:
1091 new_pool_needed.value = 1 1218 # Notify the main thread that we need a new consumer pool.
1092 need_pool_or_done_cond.notify_all() 1219 new_pool_needed.value = 1
1093 # The main thread will notify us when it finishes. 1220 need_pool_or_done_cond.notify_all()
1094 need_pool_or_done_cond.wait() 1221 # The main thread will notify us when it finishes.
1222 need_pool_or_done_cond.wait()
1223 finally:
1224 if not is_main_thread:
1225 worker_checking_level_lock.release()
1095 1226
1096 # If we're running in this process, create a separate task queue. Otherwise, 1227 # If we're running in this process, create a separate task queue. Otherwise,
1097 # if Apply has already been called with process_count > 1, then there will 1228 # if Apply has already been called with process_count > 1, then there will
1098 # be consumer pools trying to use our processes. 1229 # be consumer pools trying to use our processes.
1099 if process_count > 1: 1230 if process_count > 1:
1100 task_queue = task_queues[self.recursive_apply_level] 1231 task_queue = task_queues[self.recursive_apply_level]
1101 else: 1232 else:
1102 task_queue = _NewMultiprocessingQueue() 1233 task_queue = _NewMultiprocessingQueue()
1103 1234
1104 # Kick off a producer thread to throw tasks in the global task queue. We 1235 # Kick off a producer thread to throw tasks in the global task queue. We
(...skipping 20 matching lines...) Expand all
1125 # pools, or we the wakeup call was meant for someone else. It's 1256 # pools, or we the wakeup call was meant for someone else. It's
1126 # impossible for both conditions to be true, since the main thread is 1257 # impossible for both conditions to be true, since the main thread is
1127 # blocked on any other ongoing calls to Apply, and a thread would not 1258 # blocked on any other ongoing calls to Apply, and a thread would not
1128 # ask for a new consumer pool unless it had more work to do. 1259 # ask for a new consumer pool unless it had more work to do.
1129 if call_completed_map[caller_id]: 1260 if call_completed_map[caller_id]:
1130 break 1261 break
1131 elif is_main_thread and new_pool_needed.value: 1262 elif is_main_thread and new_pool_needed.value:
1132 new_pool_needed.value = 0 1263 new_pool_needed.value = 0
1133 self._CreateNewConsumerPool(process_count, thread_count) 1264 self._CreateNewConsumerPool(process_count, thread_count)
1134 need_pool_or_done_cond.notify_all() 1265 need_pool_or_done_cond.notify_all()
1135 1266
1136 # Note that we must check the above conditions before the wait() call; 1267 # Note that we must check the above conditions before the wait() call;
1137 # otherwise, the notification can happen before we start waiting, in 1268 # otherwise, the notification can happen before we start waiting, in
1138 # which case we'll block forever. 1269 # which case we'll block forever.
1139 need_pool_or_done_cond.wait() 1270 need_pool_or_done_cond.wait()
1140 else: # Using a single process. 1271 else: # Using a single process.
1141 shard = 0 1272 self._ApplyThreads(thread_count, process_count,
1142 self._ApplyThreads(thread_count, self.recursive_apply_level, shard, 1273 self.recursive_apply_level,
1143 is_blocking_call=True, task_queue=task_queue) 1274 is_blocking_call=True, task_queue=task_queue)
1144 1275
1145 # We encountered an exception from the producer thread before any arguments 1276 # We encountered an exception from the producer thread before any arguments
1146 # were enqueued, but it wouldn't have been propagated, so we'll now 1277 # were enqueued, but it wouldn't have been propagated, so we'll now
1147 # explicitly raise it here. 1278 # explicitly raise it here.
1148 if producer_thread.unknown_exception: 1279 if producer_thread.unknown_exception:
1280 # pylint: disable=raising-bad-type
1149 raise producer_thread.unknown_exception 1281 raise producer_thread.unknown_exception
1150 1282
1151 # We encountered an exception from the producer thread while iterating over 1283 # We encountered an exception from the producer thread while iterating over
1152 # the arguments, so raise it here if we're meant to fail on error. 1284 # the arguments, so raise it here if we're meant to fail on error.
1153 if producer_thread.iterator_exception and fail_on_error: 1285 if producer_thread.iterator_exception and fail_on_error:
1286 # pylint: disable=raising-bad-type
1154 raise producer_thread.iterator_exception 1287 raise producer_thread.iterator_exception
1155 1288
1156 def _ApplyThreads(self, thread_count, recursive_apply_level, shard, 1289 def _ApplyThreads(self, thread_count, process_count, recursive_apply_level,
1157 is_blocking_call=False, task_queue=None): 1290 is_blocking_call=False, task_queue=None):
1158 """ 1291 """Assigns the work from the multi-process global task queue.
1159 Assigns the work from the global task queue shared among all processes 1292
1160 to an individual process, for later consumption either by the WorkerThreads 1293 Work is assigned to an individual process for later consumption either by
1161 or in this thread if thread_count == 1. 1294 the WorkerThreads or (if thread_count == 1) this thread.
1162 1295
1163 Args: 1296 Args:
1164 thread_count: The number of threads used to perform the work. If 1, then 1297 thread_count: The number of threads used to perform the work. If 1, then
1165 perform all work in this thread. 1298 perform all work in this thread.
1299 process_count: The number of processes used to perform the work.
1166 recursive_apply_level: The depth in the tree of recursive calls to Apply 1300 recursive_apply_level: The depth in the tree of recursive calls to Apply
1167 of this thread. 1301 of this thread.
1168 shard: Assigned subset (shard number) for this function.
1169 is_blocking_call: True iff the call to Apply is blocked on this call 1302 is_blocking_call: True iff the call to Apply is blocked on this call
1170 (which is true iff process_count == 1), implying that 1303 (which is true iff process_count == 1), implying that
1171 _ApplyThreads must behave as a blocking call. 1304 _ApplyThreads must behave as a blocking call.
1172 """ 1305 """
1173 self._ResetConnectionPool() 1306 self._ResetConnectionPool()
1174 1307 self.recursive_apply_level = recursive_apply_level
1308
1175 task_queue = task_queue or task_queues[recursive_apply_level] 1309 task_queue = task_queue or task_queues[recursive_apply_level]
1176 1310
1311 assert thread_count * process_count > 1, (
1312 'Invalid state, calling command._ApplyThreads with only one thread '
1313 'and process.')
1177 if thread_count > 1: 1314 if thread_count > 1:
1178 worker_pool = WorkerPool(thread_count) 1315 worker_pool = WorkerPool(
1179 else: 1316 thread_count, self.logger,
1180 worker_pool = SameThreadWorkerPool(self) 1317 bucket_storage_uri_class=self.bucket_storage_uri_class,
1318 gsutil_api_map=self.gsutil_api_map, debug=self.debug)
1319 elif process_count > 1:
1320 worker_pool = SameThreadWorkerPool(
1321 self, bucket_storage_uri_class=self.bucket_storage_uri_class,
1322 gsutil_api_map=self.gsutil_api_map, debug=self.debug)
1181 1323
1182 num_enqueued = 0 1324 num_enqueued = 0
1183 while True: 1325 while True:
1184 task = task_queue.get() 1326 task = task_queue.get()
1185 if task.args != ZERO_TASKS_TO_DO_ARGUMENT: 1327 if task.args != ZERO_TASKS_TO_DO_ARGUMENT:
1186 # If we have no tasks to do and we're performing a blocking call, we 1328 # If we have no tasks to do and we're performing a blocking call, we
1187 # need a special signal to tell us to stop - otherwise, we block on 1329 # need a special signal to tell us to stop - otherwise, we block on
1188 # the call to task_queue.get() forever. 1330 # the call to task_queue.get() forever.
1189 worker_pool.AddTask(task) 1331 worker_pool.AddTask(task)
1190 num_enqueued += 1 1332 num_enqueued += 1
(...skipping 14 matching lines...) Expand all
1205 # notified before we grabbed the lock. 1347 # notified before we grabbed the lock.
1206 return 1348 return
1207 need_pool_or_done_cond.wait() 1349 need_pool_or_done_cond.wait()
1208 1350
1209 1351
1210 # Below here lie classes and functions related to controlling the flow of tasks 1352 # Below here lie classes and functions related to controlling the flow of tasks
1211 # between various threads and processes. 1353 # between various threads and processes.
1212 1354
1213 1355
1214 class _ConsumerPool(object): 1356 class _ConsumerPool(object):
1357
1215 def __init__(self, processes, task_queue): 1358 def __init__(self, processes, task_queue):
1216 self.processes = processes 1359 self.processes = processes
1217 self.task_queue = task_queue 1360 self.task_queue = task_queue
1218 1361
1219 def ShutDown(self): 1362 def ShutDown(self):
1220 for process in self.processes: 1363 for process in self.processes:
1221 KillProcess(process.pid) 1364 KillProcess(process.pid)
1222 1365
1223 1366
1224 def KillProcess(pid): 1367 def KillProcess(pid):
1225 # os.kill doesn't work in 2.X or 3.Y on Windows for any X < 7 or Y < 2. 1368 # os.kill doesn't work in 2.X or 3.Y on Windows for any X < 7 or Y < 2.
1226 if IS_WINDOWS and ((2, 6) <= sys.version_info[:3] < (2, 7) or 1369 if IS_WINDOWS and ((2, 6) <= sys.version_info[:3] < (2, 7) or
1227 (3, 0) <= sys.version_info[:3] < (3, 2)): 1370 (3, 0) <= sys.version_info[:3] < (3, 2)):
1228 try: 1371 try:
1229 kernel32 = ctypes.windll.kernel32 1372 kernel32 = ctypes.windll.kernel32
1230 handle = kernel32.OpenProcess(1, 0, pid) 1373 handle = kernel32.OpenProcess(1, 0, pid)
1231 kernel32.TerminateProcess(handle, 0) 1374 kernel32.TerminateProcess(handle, 0)
1232 except: 1375 except: # pylint: disable=bare-except
1233 pass 1376 pass
1234 else: 1377 else:
1235 try: 1378 try:
1236 os.kill(pid, signal.SIGKILL) 1379 os.kill(pid, signal.SIGKILL)
1237 except OSError: 1380 except OSError:
1238 pass 1381 pass
1239 1382
1240 1383
1241 class Task(namedtuple('Task', 1384 class Task(namedtuple('Task', (
1242 'func args caller_id exception_handler should_return_results arg_checker ' + 1385 'func args caller_id exception_handler should_return_results arg_checker '
1243 'fail_on_error')): 1386 'fail_on_error'))):
1244 """ 1387 """Task class representing work to be completed.
1388
1245 Args: 1389 Args:
1246 func: The function to be executed. 1390 func: The function to be executed.
1247 args: The arguments to func. 1391 args: The arguments to func.
1248 caller_id: The globally-unique caller ID corresponding to the Apply call. 1392 caller_id: The globally-unique caller ID corresponding to the Apply call.
1249 exception_handler: The exception handler to use if the call to func fails. 1393 exception_handler: The exception handler to use if the call to func fails.
1250 should_return_results: True iff the results of this function should be 1394 should_return_results: True iff the results of this function should be
1251 returned from the Apply call. 1395 returned from the Apply call.
1252 arg_checker: Used to determine whether we should process the current 1396 arg_checker: Used to determine whether we should process the current
1253 argument or simply skip it. Also handles any logging that 1397 argument or simply skip it. Also handles any logging that
1254 is specific to a particular type of argument. 1398 is specific to a particular type of argument.
1255 fail_on_error: If true, then raise any exceptions encountered when 1399 fail_on_error: If true, then raise any exceptions encountered when
1256 executing func. This is only applicable in the case of 1400 executing func. This is only applicable in the case of
1257 process_count == thread_count == 1. 1401 process_count == thread_count == 1.
1258 """ 1402 """
1259 pass 1403 pass
1260 1404
1261 1405
1262 class ProducerThread(threading.Thread): 1406 class ProducerThread(threading.Thread):
1263 """Thread used to enqueue work for other processes and threads.""" 1407 """Thread used to enqueue work for other processes and threads."""
1408
1264 def __init__(self, cls, args_iterator, caller_id, func, task_queue, 1409 def __init__(self, cls, args_iterator, caller_id, func, task_queue,
1265 should_return_results, exception_handler, arg_checker, 1410 should_return_results, exception_handler, arg_checker,
1266 fail_on_error): 1411 fail_on_error):
1267 """ 1412 """Initializes the producer thread.
1413
1268 Args: 1414 Args:
1269 cls: Instance of Command for which this ProducerThread was created. 1415 cls: Instance of Command for which this ProducerThread was created.
1270 args_iterator: Iterable collection of arguments to be put into the 1416 args_iterator: Iterable collection of arguments to be put into the
1271 work queue. 1417 work queue.
1272 caller_id: Globally-unique caller ID corresponding to this call to Apply. 1418 caller_id: Globally-unique caller ID corresponding to this call to Apply.
1273 func: The function to be called on each element of args_iterator. 1419 func: The function to be called on each element of args_iterator.
1274 task_queue: The queue into which tasks will be put, to later be consumed 1420 task_queue: The queue into which tasks will be put, to later be consumed
1275 by Command._ApplyThreads. 1421 by Command._ApplyThreads.
1276 should_return_results: True iff the results for this call to command.Apply 1422 should_return_results: True iff the results for this call to command.Apply
1277 were requested. 1423 were requested.
(...skipping 26 matching lines...) Expand all
1304 num_tasks = 0 1450 num_tasks = 0
1305 cur_task = None 1451 cur_task = None
1306 last_task = None 1452 last_task = None
1307 try: 1453 try:
1308 args_iterator = iter(self.args_iterator) 1454 args_iterator = iter(self.args_iterator)
1309 while True: 1455 while True:
1310 try: 1456 try:
1311 args = args_iterator.next() 1457 args = args_iterator.next()
1312 except StopIteration, e: 1458 except StopIteration, e:
1313 break 1459 break
1314 except Exception, e: 1460 except Exception, e: # pylint: disable=broad-except
1461 _IncrementFailureCount()
1315 if self.fail_on_error: 1462 if self.fail_on_error:
1316 self.iterator_exception = e 1463 self.iterator_exception = e
1317 raise 1464 raise
1318 else: 1465 else:
1319 try: 1466 try:
1320 self.exception_handler(self.cls, e) 1467 self.exception_handler(self.cls, e)
1321 except Exception, e1: 1468 except Exception, _: # pylint: disable=broad-except
1322 self.cls.logger.debug( 1469 self.cls.logger.debug(
1323 'Caught exception while handling exception for %s:\n%s', 1470 'Caught exception while handling exception for %s:\n%s',
1324 self.func, traceback.format_exc()) 1471 self.func, traceback.format_exc())
1325 self.shared_variables_updater.Update(self.caller_id, self.cls) 1472 self.shared_variables_updater.Update(self.caller_id, self.cls)
1326 continue 1473 continue
1327 1474
1328 if self.arg_checker(self.cls, args): 1475 if self.arg_checker(self.cls, args):
1329 num_tasks += 1 1476 num_tasks += 1
1330 last_task = cur_task 1477 last_task = cur_task
1331 cur_task = Task(self.func, args, self.caller_id, 1478 cur_task = Task(self.func, args, self.caller_id,
1332 self.exception_handler, self.should_return_results, 1479 self.exception_handler, self.should_return_results,
1333 self.arg_checker, self.fail_on_error) 1480 self.arg_checker, self.fail_on_error)
1334 if last_task: 1481 if last_task:
1335 self.task_queue.put(last_task, self.caller_id) 1482 self.task_queue.put(last_task)
1336 except Exception, e: 1483 except Exception, e: # pylint: disable=broad-except
1337 # This will also catch any exception raised due to an error in the 1484 # This will also catch any exception raised due to an error in the
1338 # iterator when fail_on_error is set, so check that we failed for some 1485 # iterator when fail_on_error is set, so check that we failed for some
1339 # other reason before claiming that we had an unknown exception. 1486 # other reason before claiming that we had an unknown exception.
1340 if not self.iterator_exception: 1487 if not self.iterator_exception:
1341 self.unknown_exception = e 1488 self.unknown_exception = e
1342 finally: 1489 finally:
1343 # We need to make sure to update total_tasks[caller_id] before we enqueue 1490 # We need to make sure to update total_tasks[caller_id] before we enqueue
1344 # the last task. Otherwise, a worker can retrieve the last task and 1491 # the last task. Otherwise, a worker can retrieve the last task and
1345 # complete it, then check total_tasks and determine that we're not done 1492 # complete it, then check total_tasks and determine that we're not done
1346 # producing all before we update total_tasks. This approach forces workers 1493 # producing all before we update total_tasks. This approach forces workers
1347 # to wait on the last task until after we've updated total_tasks. 1494 # to wait on the last task until after we've updated total_tasks.
1348 total_tasks[self.caller_id] = num_tasks 1495 total_tasks[self.caller_id] = num_tasks
1349 if not cur_task: 1496 if not cur_task:
1350 # This happens if there were zero arguments to be put in the queue. 1497 # This happens if there were zero arguments to be put in the queue.
1351 cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id, 1498 cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id,
1352 None, None, None, None) 1499 None, None, None, None)
1353 self.task_queue.put(cur_task, self.caller_id) 1500 self.task_queue.put(cur_task)
1354 1501
1355 # It's possible that the workers finished before we updated total_tasks, 1502 # It's possible that the workers finished before we updated total_tasks,
1356 # so we need to check here as well. 1503 # so we need to check here as well.
1357 _NotifyIfDone(self.caller_id, 1504 _NotifyIfDone(self.caller_id,
1358 caller_id_finished_count.get(self.caller_id)) 1505 caller_id_finished_count.Get(self.caller_id))
1359 1506
1360 1507
1361 class SameThreadWorkerPool(object): 1508 class SameThreadWorkerPool(object):
1362 """Behaves like a WorkerPool, but used for the single-threaded case.""" 1509 """Behaves like a WorkerPool, but used for the single-threaded case."""
1363 def __init__(self, cls): 1510
1511 def __init__(self, cls, bucket_storage_uri_class=None,
1512 gsutil_api_map=None, debug=0):
1364 self.cls = cls 1513 self.cls = cls
1365 self.worker_thread = WorkerThread(None) 1514 self.worker_thread = WorkerThread(
1366 1515 None, cls.logger,
1516 bucket_storage_uri_class=bucket_storage_uri_class,
1517 gsutil_api_map=gsutil_api_map, debug=debug)
1518
1367 def AddTask(self, task): 1519 def AddTask(self, task):
1368 self.worker_thread.PerformTask(task, self.cls) 1520 self.worker_thread.PerformTask(task, self.cls)
1369 1521
1370 1522
1371 class WorkerPool(object): 1523 class WorkerPool(object):
1372 """Pool of worker threads to which tasks can be added.""" 1524 """Pool of worker threads to which tasks can be added."""
1373 def __init__(self, thread_count): 1525
1526 def __init__(self, thread_count, logger, bucket_storage_uri_class=None,
1527 gsutil_api_map=None, debug=0):
1374 self.task_queue = _NewThreadsafeQueue() 1528 self.task_queue = _NewThreadsafeQueue()
1375 self.threads = [] 1529 self.threads = []
1376 for _ in range(thread_count): 1530 for _ in range(thread_count):
1377 worker_thread = WorkerThread(self.task_queue) 1531 worker_thread = WorkerThread(
1532 self.task_queue, logger,
1533 bucket_storage_uri_class=bucket_storage_uri_class,
1534 gsutil_api_map=gsutil_api_map, debug=debug)
1378 self.threads.append(worker_thread) 1535 self.threads.append(worker_thread)
1379 worker_thread.start() 1536 worker_thread.start()
1380 1537
1381 def AddTask(self, task): 1538 def AddTask(self, task):
1382 self.task_queue.put(task) 1539 self.task_queue.put(task)
1383 1540
1384 1541
1385 class WorkerThread(threading.Thread): 1542 class WorkerThread(threading.Thread):
1386 """ 1543 """Thread where all the work will be performed.
1387 This thread is where all of the work will be performed in actually making the 1544
1388 function calls for Apply. It takes care of all error handling, return value 1545 This makes the function calls for Apply and takes care of all error handling,
1389 propagation, and shared_vars. 1546 return value propagation, and shared_vars.
1390 1547
1391 Note that this thread is NOT started upon instantiation because the function- 1548 Note that this thread is NOT started upon instantiation because the function-
1392 calling logic is also used in the single-threaded case. 1549 calling logic is also used in the single-threaded case.
1393 """ 1550 """
1394 def __init__(self, task_queue, multiprocessing_is_available=True): 1551
1395 """ 1552 def __init__(self, task_queue, logger, bucket_storage_uri_class=None,
1553 gsutil_api_map=None, debug=0):
1554 """Initializes the worker thread.
1555
1396 Args: 1556 Args:
1397 task_queue: The thread-safe queue from which this thread should obtain 1557 task_queue: The thread-safe queue from which this thread should obtain
1398 its work. 1558 its work.
1399 multiprocessing_is_available: False iff the multiprocessing module is not 1559 logger: Logger to use for this thread.
1400 available, in which case we're using 1560 bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
1401 _SequentialApply. 1561 Settable for testing/mocking.
1562 gsutil_api_map: Map of providers and API selector tuples to api classes
1563 which can be used to communicate with those providers.
1564 Used for the instantiating CloudApiDelegator class.
1565 debug: debug level for the CloudApiDelegator class.
1402 """ 1566 """
1403 super(WorkerThread, self).__init__() 1567 super(WorkerThread, self).__init__()
1404 self.task_queue = task_queue 1568 self.task_queue = task_queue
1405 self.multiprocessing_is_available = multiprocessing_is_available
1406 self.daemon = True 1569 self.daemon = True
1407 self.cached_classes = {} 1570 self.cached_classes = {}
1408 self.shared_vars_updater = _SharedVariablesUpdater() 1571 self.shared_vars_updater = _SharedVariablesUpdater()
1409 1572
1573 self.thread_gsutil_api = None
1574 if bucket_storage_uri_class and gsutil_api_map:
1575 self.thread_gsutil_api = CloudApiDelegator(
1576 bucket_storage_uri_class, gsutil_api_map, logger, debug=debug)
1577
1410 def PerformTask(self, task, cls): 1578 def PerformTask(self, task, cls):
1411 """ 1579 """Makes the function call for a task.
1412 Makes the function call for a task.
1413 1580
1414 Args: 1581 Args:
1415 task: The Task to perform. 1582 task: The Task to perform.
1416 cls: The instance of a class which gives context to the functions called 1583 cls: The instance of a class which gives context to the functions called
1417 by the Task's function. E.g., see _SetAclFuncWrapper. 1584 by the Task's function. E.g., see SetAclFuncWrapper.
1418 """ 1585 """
1419 caller_id = task.caller_id 1586 caller_id = task.caller_id
1420 try: 1587 try:
1421 results = task.func(cls, task.args) 1588 results = task.func(cls, task.args, thread_state=self.thread_gsutil_api)
1422 if task.should_return_results: 1589 if task.should_return_results:
1423 global_return_values_map.update(caller_id, [results], default_value=[]) 1590 global_return_values_map.Update(caller_id, [results], default_value=[])
1424 except Exception, e: 1591 except Exception, e: # pylint: disable=broad-except
1592 _IncrementFailureCount()
1425 if task.fail_on_error: 1593 if task.fail_on_error:
1426 raise # Only happens for single thread and process case. 1594 raise # Only happens for single thread and process case.
1427 else: 1595 else:
1428 try: 1596 try:
1429 task.exception_handler(cls, e) 1597 task.exception_handler(cls, e)
1430 except Exception, e1: 1598 except Exception, _: # pylint: disable=broad-except
1431 # Don't allow callers to throw exceptions here and kill the worker 1599 # Don't allow callers to raise exceptions here and kill the worker
1432 # threads. 1600 # threads.
1433 cls.logger.debug( 1601 cls.logger.debug(
1434 'Caught exception while handling exception for %s:\n%s', 1602 'Caught exception while handling exception for %s:\n%s',
1435 task, traceback.format_exc()) 1603 task, traceback.format_exc())
1436 finally: 1604 finally:
1437 self.shared_vars_updater.Update(caller_id, cls) 1605 self.shared_vars_updater.Update(caller_id, cls)
1438 1606
1439 # Even if we encounter an exception, we still need to claim that that 1607 # Even if we encounter an exception, we still need to claim that that
1440 # the function finished executing. Otherwise, we won't know when to 1608 # the function finished executing. Otherwise, we won't know when to
1441 # stop waiting and return results. 1609 # stop waiting and return results.
1442 num_done = caller_id_finished_count.update(caller_id, 1) 1610 num_done = caller_id_finished_count.Update(caller_id, 1)
1443 if self.multiprocessing_is_available: 1611
1444 _NotifyIfDone(caller_id, num_done) 1612 if cls.multiprocessing_is_available:
1613 _NotifyIfDone(caller_id, num_done)
1445 1614
1446 def run(self): 1615 def run(self):
1447 while True: 1616 while True:
1448 task = self.task_queue.get() 1617 task = self.task_queue.get()
1449 caller_id = task.caller_id 1618 caller_id = task.caller_id
1450 1619
1451 # Get the instance of the command with the appropriate context. 1620 # Get the instance of the command with the appropriate context.
1452 cls = self.cached_classes.get(caller_id, None) 1621 cls = self.cached_classes.get(caller_id, None)
1453 if not cls: 1622 if not cls:
1454 cls = copy.copy(class_map[caller_id]) 1623 cls = copy.copy(class_map[caller_id])
1455 cls.logger = CreateGsutilLogger(cls.command_name) 1624 cls.logger = CreateGsutilLogger(cls.command_name)
1456 self.cached_classes[caller_id] = cls 1625 self.cached_classes[caller_id] = cls
1457 1626
1458 self.PerformTask(task, cls) 1627 self.PerformTask(task, cls)
1459 1628
1460 1629
1461 class _SharedVariablesUpdater(object): 1630 class _SharedVariablesUpdater(object):
1462 """Used to update shared variable for a class in the global map. Note that 1631 """Used to update shared variable for a class in the global map.
1463 each thread will have its own instance of the calling class for context, 1632
1464 and it will also have its own instance of a _SharedVariablesUpdater. 1633 Note that each thread will have its own instance of the calling class for
1465 This is used in the following way: 1634 context, and it will also have its own instance of a
1635 _SharedVariablesUpdater. This is used in the following way:
1466 1636
1467 1. Before any tasks are performed, each thread will get a copy of the 1637 1. Before any tasks are performed, each thread will get a copy of the
1468 calling class, and the globally-consistent value of this shared variable 1638 calling class, and the globally-consistent value of this shared variable
1469 will be initialized to whatever it was before the call to Apply began. 1639 will be initialized to whatever it was before the call to Apply began.
1470 1640
1471 2. After each time a thread performs a task, it will look at the current 1641 2. After each time a thread performs a task, it will look at the current
1472 values of the shared variables in its instance of the calling class. 1642 values of the shared variables in its instance of the calling class.
1473 1643
1474 2.A. For each such variable, it computes the delta of this variable 1644 2.A. For each such variable, it computes the delta of this variable
1475 between the last known value for this class (which is stored in 1645 between the last known value for this class (which is stored in
1476 a dict local to this class) and the current value of the variable 1646 a dict local to this class) and the current value of the variable
1477 in the class. 1647 in the class.
1478 1648
1479 2.B. Using this delta, we update the last known value locally as well 1649 2.B. Using this delta, we update the last known value locally as well
1480 as the globally-consistent value shared across all classes (the 1650 as the globally-consistent value shared across all classes (the
1481 globally consistent value is simply increased by the computed 1651 globally consistent value is simply increased by the computed
1482 delta). 1652 delta).
1483 """ 1653 """
1654
1484 def __init__(self): 1655 def __init__(self):
1485 self.last_shared_var_values = {} 1656 self.last_shared_var_values = {}
1486 1657
1487 def Update(self, caller_id, cls): 1658 def Update(self, caller_id, cls):
1488 """Update any shared variables with their deltas.""" 1659 """Update any shared variables with their deltas."""
1489 shared_vars = shared_vars_list_map.get(caller_id, None) 1660 shared_vars = shared_vars_list_map.get(caller_id, None)
1490 if shared_vars: 1661 if shared_vars:
1491 for name in shared_vars: 1662 for name in shared_vars:
1492 key = (caller_id, name) 1663 key = (caller_id, name)
1493 last_value = self.last_shared_var_values.get(key, 0) 1664 last_value = self.last_shared_var_values.get(key, 0)
1494 # Compute the change made since the last time we updated here. This is 1665 # Compute the change made since the last time we updated here. This is
1495 # calculated by simply subtracting the last known value from the current 1666 # calculated by simply subtracting the last known value from the current
1496 # value in the class instance. 1667 # value in the class instance.
1497 delta = getattr(cls, name) - last_value 1668 delta = getattr(cls, name) - last_value
1498 self.last_shared_var_values[key] = delta + last_value 1669 self.last_shared_var_values[key] = delta + last_value
1499 1670
1500 # Update the globally-consistent value by simply increasing it by the 1671 # Update the globally-consistent value by simply increasing it by the
1501 # computed delta. 1672 # computed delta.
1502 shared_vars_map.update(key, delta) 1673 shared_vars_map.Update(key, delta)
1503 1674
1504 1675
1505 def _NotifyIfDone(caller_id, num_done): 1676 def _NotifyIfDone(caller_id, num_done):
1506 """ 1677 """Notify any threads waiting for results that something has finished.
1507 Notify any threads that are waiting for results that something has finished. 1678
1508 Each waiting thread will then need to check the call_completed_map to see if 1679 Each waiting thread will then need to check the call_completed_map to see if
1509 its work is done. 1680 its work is done.
1510 1681
1511 Note that num_done could be calculated here, but it is passed in as an 1682 Note that num_done could be calculated here, but it is passed in as an
1512 optimization so that we have one less call to a globally-locked data 1683 optimization so that we have one less call to a globally-locked data
1513 structure. 1684 structure.
1514 1685
1515 Args: 1686 Args:
1516 caller_id: The caller_id of the function whose progress we're checking. 1687 caller_id: The caller_id of the function whose progress we're checking.
1517 num_done: The number of tasks currently completed for that caller_id. 1688 num_done: The number of tasks currently completed for that caller_id.
1518 """ 1689 """
1519 num_to_do = total_tasks[caller_id] 1690 num_to_do = total_tasks[caller_id]
1520 if num_to_do == num_done and num_to_do >= 0: 1691 if num_to_do == num_done and num_to_do >= 0:
1521 # Notify the Apply call that's sleeping that it's ready to return. 1692 # Notify the Apply call that's sleeping that it's ready to return.
1522 with need_pool_or_done_cond: 1693 with need_pool_or_done_cond:
1523 call_completed_map[caller_id] = True 1694 call_completed_map[caller_id] = True
1524 need_pool_or_done_cond.notify_all() 1695 need_pool_or_done_cond.notify_all()
1525 1696
1697
1526 def ShutDownGsutil(): 1698 def ShutDownGsutil():
1527 """Shut down all processes in consumer pools in preparation for exiting.""" 1699 """Shut down all processes in consumer pools in preparation for exiting."""
1528 for q in queues: 1700 for q in queues:
1529 try: 1701 try:
1530 q.cancel_join_thread() 1702 q.cancel_join_thread()
1531 except: 1703 except: # pylint: disable=bare-except
1532 pass 1704 pass
1533 for consumer_pool in consumer_pools: 1705 for consumer_pool in consumer_pools:
1534 consumer_pool.ShutDown() 1706 consumer_pool.ShutDown()
1535 1707
1708
1709 # pylint: disable=global-variable-undefined
1710 def _IncrementFailureCount():
1711 global failure_count
1712 if isinstance(failure_count, int):
1713 failure_count += 1
1714 else: # Otherwise it's a multiprocessing.Value() of type 'i'.
1715 failure_count.value += 1
1716
1717
1718 # pylint: disable=global-variable-undefined
1719 def GetFailureCount():
1720 """Returns the number of failures processed during calls to Apply()."""
1721 try:
1722 if isinstance(failure_count, int):
1723 return failure_count
1724 else: # It's a multiprocessing.Value() of type 'i'.
1725 return failure_count.value
1726 except NameError: # If it wasn't initialized, Apply() wasn't called.
1727 return 0
1728
1729
1730 def ResetFailureCount():
1731 """Resets the failure_count variable to 0 - useful if error is expected."""
1732 try:
1733 global failure_count
1734 if isinstance(failure_count, int):
1735 failure_count = 0
1736 else: # It's a multiprocessing.Value() of type 'i'.
1737 failure_count = multiprocessing.Value('i', 0)
1738 except NameError: # If it wasn't initialized, Apply() wasn't called.
1739 pass
OLDNEW
« no previous file with comments | « gslib/cloud_api_helper.py ('k') | gslib/command_runner.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698