OLD | NEW |
| (Empty) |
1 # -*- coding: utf-8 -*- | |
2 # Copyright 2010 Google Inc. All Rights Reserved. | |
3 # | |
4 # Licensed under the Apache License, Version 2.0 (the "License"); | |
5 # you may not use this file except in compliance with the License. | |
6 # You may obtain a copy of the License at | |
7 # | |
8 # http://www.apache.org/licenses/LICENSE-2.0 | |
9 # | |
10 # Unless required by applicable law or agreed to in writing, software | |
11 # distributed under the License is distributed on an "AS IS" BASIS, | |
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 # See the License for the specific language governing permissions and | |
14 # limitations under the License. | |
15 """Base class for gsutil commands. | |
16 | |
17 In addition to base class code, this file contains helpers that depend on base | |
18 class state (such as GetAndPrintAcl) In general, functions that depend on | |
19 class state and that are used by multiple commands belong in this file. | |
20 Functions that don't depend on class state belong in util.py, and non-shared | |
21 helpers belong in individual subclasses. | |
22 """ | |
23 | |
24 from __future__ import absolute_import | |
25 | |
26 import codecs | |
27 from collections import namedtuple | |
28 import copy | |
29 import getopt | |
30 import logging | |
31 import multiprocessing | |
32 import os | |
33 import Queue | |
34 import signal | |
35 import sys | |
36 import textwrap | |
37 import threading | |
38 import traceback | |
39 | |
40 import boto | |
41 from boto.storage_uri import StorageUri | |
42 import gslib | |
43 from gslib.cloud_api import AccessDeniedException | |
44 from gslib.cloud_api import ArgumentException | |
45 from gslib.cloud_api import ServiceException | |
46 from gslib.cloud_api_delegator import CloudApiDelegator | |
47 from gslib.cs_api_map import ApiSelector | |
48 from gslib.cs_api_map import GsutilApiMapFactory | |
49 from gslib.exception import CommandException | |
50 from gslib.help_provider import HelpProvider | |
51 from gslib.name_expansion import NameExpansionIterator | |
52 from gslib.name_expansion import NameExpansionResult | |
53 from gslib.parallelism_framework_util import AtomicIncrementDict | |
54 from gslib.parallelism_framework_util import BasicIncrementDict | |
55 from gslib.parallelism_framework_util import ThreadAndProcessSafeDict | |
56 from gslib.plurality_checkable_iterator import PluralityCheckableIterator | |
57 from gslib.sig_handling import RegisterSignalHandler | |
58 from gslib.storage_url import StorageUrlFromString | |
59 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages | |
60 from gslib.translation_helper import AclTranslation | |
61 from gslib.util import GetConfigFilePath | |
62 from gslib.util import GsutilStreamHandler | |
63 from gslib.util import HaveFileUrls | |
64 from gslib.util import HaveProviderUrls | |
65 from gslib.util import IS_WINDOWS | |
66 from gslib.util import MultiprocessingIsAvailable | |
67 from gslib.util import NO_MAX | |
68 from gslib.util import UrlsAreForSingleProvider | |
69 from gslib.util import UTF8 | |
70 from gslib.wildcard_iterator import CreateWildcardIterator | |
71 | |
72 OFFER_GSUTIL_M_SUGGESTION_THRESHOLD = 5 | |
73 | |
74 if IS_WINDOWS: | |
75 import ctypes # pylint: disable=g-import-not-at-top | |
76 | |
77 | |
78 def _DefaultExceptionHandler(cls, e): | |
79 cls.logger.exception(e) | |
80 | |
81 | |
82 def CreateGsutilLogger(command_name): | |
83 """Creates a logger that resembles 'print' output. | |
84 | |
85 This logger abides by gsutil -d/-D/-DD/-q options. | |
86 | |
87 By default (if none of the above options is specified) the logger will display | |
88 all messages logged with level INFO or above. Log propagation is disabled. | |
89 | |
90 Args: | |
91 command_name: Command name to create logger for. | |
92 | |
93 Returns: | |
94 A logger object. | |
95 """ | |
96 log = logging.getLogger(command_name) | |
97 log.propagate = False | |
98 log.setLevel(logging.root.level) | |
99 log_handler = GsutilStreamHandler() | |
100 log_handler.setFormatter(logging.Formatter('%(message)s')) | |
101 # Commands that call other commands (like mv) would cause log handlers to be | |
102 # added more than once, so avoid adding if one is already present. | |
103 if not log.handlers: | |
104 log.addHandler(log_handler) | |
105 return log | |
106 | |
107 | |
108 def _UrlArgChecker(command_instance, url): | |
109 if not command_instance.exclude_symlinks: | |
110 return True | |
111 exp_src_url = url.expanded_storage_url | |
112 if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name): | |
113 command_instance.logger.info('Skipping symbolic link %s...', exp_src_url) | |
114 return False | |
115 return True | |
116 | |
117 | |
118 def DummyArgChecker(*unused_args): | |
119 return True | |
120 | |
121 | |
122 def SetAclFuncWrapper(cls, name_expansion_result, thread_state=None): | |
123 return cls.SetAclFunc(name_expansion_result, thread_state=thread_state) | |
124 | |
125 | |
126 def SetAclExceptionHandler(cls, e): | |
127 """Exception handler that maintains state about post-completion status.""" | |
128 cls.logger.error(str(e)) | |
129 cls.everything_set_okay = False | |
130 | |
131 # We will keep this list of all thread- or process-safe queues ever created by | |
132 # the main thread so that we can forcefully kill them upon shutdown. Otherwise, | |
133 # we encounter a Python bug in which empty queues block forever on join (which | |
134 # is called as part of the Python exit function cleanup) under the impression | |
135 # that they are non-empty. | |
136 # However, this also lets us shut down somewhat more cleanly when interrupted. | |
137 queues = [] | |
138 | |
139 | |
140 def _NewMultiprocessingQueue(): | |
141 queue = multiprocessing.Queue(MAX_QUEUE_SIZE) | |
142 queues.append(queue) | |
143 return queue | |
144 | |
145 | |
146 def _NewThreadsafeQueue(): | |
147 queue = Queue.Queue(MAX_QUEUE_SIZE) | |
148 queues.append(queue) | |
149 return queue | |
150 | |
151 # The maximum size of a process- or thread-safe queue. Imposing this limit | |
152 # prevents us from needing to hold an arbitrary amount of data in memory. | |
153 # However, setting this number too high (e.g., >= 32768 on OS X) can cause | |
154 # problems on some operating systems. | |
155 MAX_QUEUE_SIZE = 32500 | |
156 | |
157 # That maximum depth of the tree of recursive calls to command.Apply. This is | |
158 # an arbitrary limit put in place to prevent developers from accidentally | |
159 # causing problems with infinite recursion, and it can be increased if needed. | |
160 MAX_RECURSIVE_DEPTH = 5 | |
161 | |
162 ZERO_TASKS_TO_DO_ARGUMENT = ('There were no', 'tasks to do') | |
163 | |
164 # Map from deprecated aliases to the current command and subcommands that | |
165 # provide the same behavior. | |
166 # TODO: Remove this map and deprecate old commands on 9/9/14. | |
167 OLD_ALIAS_MAP = {'chacl': ['acl', 'ch'], | |
168 'getacl': ['acl', 'get'], | |
169 'setacl': ['acl', 'set'], | |
170 'getcors': ['cors', 'get'], | |
171 'setcors': ['cors', 'set'], | |
172 'chdefacl': ['defacl', 'ch'], | |
173 'getdefacl': ['defacl', 'get'], | |
174 'setdefacl': ['defacl', 'set'], | |
175 'disablelogging': ['logging', 'set', 'off'], | |
176 'enablelogging': ['logging', 'set', 'on'], | |
177 'getlogging': ['logging', 'get'], | |
178 'getversioning': ['versioning', 'get'], | |
179 'setversioning': ['versioning', 'set'], | |
180 'getwebcfg': ['web', 'get'], | |
181 'setwebcfg': ['web', 'set']} | |
182 | |
183 | |
184 # Declare all of the module level variables - see | |
185 # InitializeMultiprocessingVariables for an explanation of why this is | |
186 # necessary. | |
187 # pylint: disable=global-at-module-level | |
188 global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter | |
189 global total_tasks, call_completed_map, global_return_values_map | |
190 global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed | |
191 global current_max_recursive_level, shared_vars_map, shared_vars_list_map | |
192 global class_map, worker_checking_level_lock, failure_count | |
193 | |
194 | |
195 def InitializeMultiprocessingVariables(): | |
196 """Initializes module-level variables that will be inherited by subprocesses. | |
197 | |
198 On Windows, a multiprocessing.Manager object should only | |
199 be created within an "if __name__ == '__main__':" block. This function | |
200 must be called, otherwise every command that calls Command.Apply will fail. | |
201 """ | |
202 # This list of global variables must exactly match the above list of | |
203 # declarations. | |
204 # pylint: disable=global-variable-undefined | |
205 global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter | |
206 global total_tasks, call_completed_map, global_return_values_map | |
207 global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed | |
208 global current_max_recursive_level, shared_vars_map, shared_vars_list_map | |
209 global class_map, worker_checking_level_lock, failure_count | |
210 | |
211 manager = multiprocessing.Manager() | |
212 | |
213 consumer_pools = [] | |
214 | |
215 # List of all existing task queues - used by all pools to find the queue | |
216 # that's appropriate for the given recursive_apply_level. | |
217 task_queues = [] | |
218 | |
219 # Used to assign a globally unique caller ID to each Apply call. | |
220 caller_id_lock = manager.Lock() | |
221 caller_id_counter = multiprocessing.Value('i', 0) | |
222 | |
223 # Map from caller_id to total number of tasks to be completed for that ID. | |
224 total_tasks = ThreadAndProcessSafeDict(manager) | |
225 | |
226 # Map from caller_id to a boolean which is True iff all its tasks are | |
227 # finished. | |
228 call_completed_map = ThreadAndProcessSafeDict(manager) | |
229 | |
230 # Used to keep track of the set of return values for each caller ID. | |
231 global_return_values_map = AtomicIncrementDict(manager) | |
232 | |
233 # Condition used to notify any waiting threads that a task has finished or | |
234 # that a call to Apply needs a new set of consumer processes. | |
235 need_pool_or_done_cond = manager.Condition() | |
236 | |
237 # Lock used to prevent multiple worker processes from asking the main thread | |
238 # to create a new consumer pool for the same level. | |
239 worker_checking_level_lock = manager.Lock() | |
240 | |
241 # Map from caller_id to the current number of completed tasks for that ID. | |
242 caller_id_finished_count = AtomicIncrementDict(manager) | |
243 | |
244 # Used as a way for the main thread to distinguish between being woken up | |
245 # by another call finishing and being woken up by a call that needs a new set | |
246 # of consumer processes. | |
247 new_pool_needed = multiprocessing.Value('i', 0) | |
248 | |
249 current_max_recursive_level = multiprocessing.Value('i', 0) | |
250 | |
251 # Map from (caller_id, name) to the value of that shared variable. | |
252 shared_vars_map = AtomicIncrementDict(manager) | |
253 shared_vars_list_map = ThreadAndProcessSafeDict(manager) | |
254 | |
255 # Map from caller_id to calling class. | |
256 class_map = manager.dict() | |
257 | |
258 # Number of tasks that resulted in an exception in calls to Apply(). | |
259 failure_count = multiprocessing.Value('i', 0) | |
260 | |
261 | |
262 # Each subclass of Command must define a property named 'command_spec' that is | |
263 # an instance of the following class. | |
264 CommandSpec = namedtuple('CommandSpec', [ | |
265 # Name of command. | |
266 'command_name', | |
267 # Usage synopsis. | |
268 'usage_synopsis', | |
269 # List of command name aliases. | |
270 'command_name_aliases', | |
271 # Min number of args required by this command. | |
272 'min_args', | |
273 # Max number of args required by this command, or NO_MAX. | |
274 'max_args', | |
275 # Getopt-style string specifying acceptable sub args. | |
276 'supported_sub_args', | |
277 # True if file URLs are acceptable for this command. | |
278 'file_url_ok', | |
279 # True if provider-only URLs are acceptable for this command. | |
280 'provider_url_ok', | |
281 # Index in args of first URL arg. | |
282 'urls_start_arg', | |
283 # List of supported APIs | |
284 'gs_api_support', | |
285 # Default API to use for this command | |
286 'gs_default_api', | |
287 # Private arguments (for internal testing) | |
288 'supported_private_args', | |
289 'argparse_arguments', | |
290 ]) | |
291 | |
292 | |
293 class Command(HelpProvider): | |
294 """Base class for all gsutil commands.""" | |
295 | |
296 # Each subclass must override this with an instance of CommandSpec. | |
297 command_spec = None | |
298 | |
299 _commands_with_subcommands_and_subopts = ['acl', 'defacl', 'logging', 'web', | |
300 'notification'] | |
301 | |
302 # This keeps track of the recursive depth of the current call to Apply. | |
303 recursive_apply_level = 0 | |
304 | |
305 # If the multiprocessing module isn't available, we'll use this to keep track | |
306 # of the caller_id. | |
307 sequential_caller_id = -1 | |
308 | |
309 @staticmethod | |
310 def CreateCommandSpec(command_name, usage_synopsis=None, | |
311 command_name_aliases=None, min_args=0, | |
312 max_args=NO_MAX, supported_sub_args='', | |
313 file_url_ok=False, provider_url_ok=False, | |
314 urls_start_arg=0, gs_api_support=None, | |
315 gs_default_api=None, supported_private_args=None, | |
316 argparse_arguments=None): | |
317 """Creates an instance of CommandSpec, with defaults.""" | |
318 return CommandSpec( | |
319 command_name=command_name, | |
320 usage_synopsis=usage_synopsis, | |
321 command_name_aliases=command_name_aliases or [], | |
322 min_args=min_args, | |
323 max_args=max_args, | |
324 supported_sub_args=supported_sub_args, | |
325 file_url_ok=file_url_ok, | |
326 provider_url_ok=provider_url_ok, | |
327 urls_start_arg=urls_start_arg, | |
328 gs_api_support=gs_api_support or [ApiSelector.XML], | |
329 gs_default_api=gs_default_api or ApiSelector.XML, | |
330 supported_private_args=supported_private_args, | |
331 argparse_arguments=argparse_arguments or []) | |
332 | |
333 # Define a convenience property for command name, since it's used many places. | |
334 def _GetDefaultCommandName(self): | |
335 return self.command_spec.command_name | |
336 command_name = property(_GetDefaultCommandName) | |
337 | |
338 def _CalculateUrlsStartArg(self): | |
339 """Calculate the index in args of the first URL arg. | |
340 | |
341 Returns: | |
342 Index of the first URL arg (according to the command spec). | |
343 """ | |
344 return self.command_spec.urls_start_arg | |
345 | |
346 def _TranslateDeprecatedAliases(self, args): | |
347 """Map deprecated aliases to the corresponding new command, and warn.""" | |
348 new_command_args = OLD_ALIAS_MAP.get(self.command_alias_used, None) | |
349 if new_command_args: | |
350 # Prepend any subcommands for the new command. The command name itself | |
351 # is not part of the args, so leave it out. | |
352 args = new_command_args[1:] + args | |
353 self.logger.warn('\n'.join(textwrap.wrap( | |
354 ('You are using a deprecated alias, "%(used_alias)s", for the ' | |
355 '"%(command_name)s" command. This will stop working on 9/9/2014. ' | |
356 'Please use "%(command_name)s" with the appropriate sub-command in ' | |
357 'the future. See "gsutil help %(command_name)s" for details.') % | |
358 {'used_alias': self.command_alias_used, | |
359 'command_name': self.command_name}))) | |
360 return args | |
361 | |
362 def __init__(self, command_runner, args, headers, debug, parallel_operations, | |
363 bucket_storage_uri_class, gsutil_api_class_map_factory, | |
364 test_method=None, logging_filters=None, | |
365 command_alias_used=None): | |
366 """Instantiates a Command. | |
367 | |
368 Args: | |
369 command_runner: CommandRunner (for commands built atop other commands). | |
370 args: Command-line args (arg0 = actual arg, not command name ala bash). | |
371 headers: Dictionary containing optional HTTP headers to pass to boto. | |
372 debug: Debug level to pass in to boto connection (range 0..3). | |
373 parallel_operations: Should command operations be executed in parallel? | |
374 bucket_storage_uri_class: Class to instantiate for cloud StorageUris. | |
375 Settable for testing/mocking. | |
376 gsutil_api_class_map_factory: Creates map of cloud storage interfaces. | |
377 Settable for testing/mocking. | |
378 test_method: Optional general purpose method for testing purposes. | |
379 Application and semantics of this method will vary by | |
380 command and test type. | |
381 logging_filters: Optional list of logging.Filters to apply to this | |
382 command's logger. | |
383 command_alias_used: The alias that was actually used when running this | |
384 command (as opposed to the "official" command name, | |
385 which will always correspond to the file name). | |
386 | |
387 Implementation note: subclasses shouldn't need to define an __init__ | |
388 method, and instead depend on the shared initialization that happens | |
389 here. If you do define an __init__ method in a subclass you'll need to | |
390 explicitly call super().__init__(). But you're encouraged not to do this, | |
391 because it will make changing the __init__ interface more painful. | |
392 """ | |
393 # Save class values from constructor params. | |
394 self.command_runner = command_runner | |
395 self.unparsed_args = args | |
396 self.headers = headers | |
397 self.debug = debug | |
398 self.parallel_operations = parallel_operations | |
399 self.bucket_storage_uri_class = bucket_storage_uri_class | |
400 self.gsutil_api_class_map_factory = gsutil_api_class_map_factory | |
401 self.test_method = test_method | |
402 self.exclude_symlinks = False | |
403 self.recursion_requested = False | |
404 self.all_versions = False | |
405 self.command_alias_used = command_alias_used | |
406 | |
407 # Global instance of a threaded logger object. | |
408 self.logger = CreateGsutilLogger(self.command_name) | |
409 if logging_filters: | |
410 for log_filter in logging_filters: | |
411 self.logger.addFilter(log_filter) | |
412 | |
413 if self.command_spec is None: | |
414 raise CommandException('"%s" command implementation is missing a ' | |
415 'command_spec definition.' % self.command_name) | |
416 | |
417 # Parse and validate args. | |
418 self.args = self._TranslateDeprecatedAliases(args) | |
419 self.ParseSubOpts() | |
420 | |
421 # Named tuple public functions start with _ | |
422 # pylint: disable=protected-access | |
423 self.command_spec = self.command_spec._replace( | |
424 urls_start_arg=self._CalculateUrlsStartArg()) | |
425 | |
426 if (len(self.args) < self.command_spec.min_args | |
427 or len(self.args) > self.command_spec.max_args): | |
428 self.RaiseWrongNumberOfArgumentsException() | |
429 | |
430 if self.command_name not in self._commands_with_subcommands_and_subopts: | |
431 self.CheckArguments() | |
432 | |
433 # Build the support and default maps from the command spec. | |
434 support_map = { | |
435 'gs': self.command_spec.gs_api_support, | |
436 's3': [ApiSelector.XML] | |
437 } | |
438 default_map = { | |
439 'gs': self.command_spec.gs_default_api, | |
440 's3': ApiSelector.XML | |
441 } | |
442 self.gsutil_api_map = GsutilApiMapFactory.GetApiMap( | |
443 self.gsutil_api_class_map_factory, support_map, default_map) | |
444 | |
445 self.project_id = None | |
446 self.gsutil_api = CloudApiDelegator( | |
447 bucket_storage_uri_class, self.gsutil_api_map, | |
448 self.logger, debug=self.debug) | |
449 | |
450 # Cross-platform path to run gsutil binary. | |
451 self.gsutil_cmd = '' | |
452 # If running on Windows, invoke python interpreter explicitly. | |
453 if gslib.util.IS_WINDOWS: | |
454 self.gsutil_cmd += 'python ' | |
455 # Add full path to gsutil to make sure we test the correct version. | |
456 self.gsutil_path = gslib.GSUTIL_PATH | |
457 self.gsutil_cmd += self.gsutil_path | |
458 | |
459 # We're treating recursion_requested like it's used by all commands, but | |
460 # only some of the commands accept the -R option. | |
461 if self.sub_opts: | |
462 for o, unused_a in self.sub_opts: | |
463 if o == '-r' or o == '-R': | |
464 self.recursion_requested = True | |
465 break | |
466 | |
467 self.multiprocessing_is_available = MultiprocessingIsAvailable()[0] | |
468 | |
469 def RaiseWrongNumberOfArgumentsException(self): | |
470 """Raises exception for wrong number of arguments supplied to command.""" | |
471 if len(self.args) < self.command_spec.min_args: | |
472 tail_str = 's' if self.command_spec.min_args > 1 else '' | |
473 message = ('The %s command requires at least %d argument%s.' % | |
474 (self.command_name, self.command_spec.min_args, tail_str)) | |
475 else: | |
476 message = ('The %s command accepts at most %d arguments.' % | |
477 (self.command_name, self.command_spec.max_args)) | |
478 message += ' Usage:\n%s\nFor additional help run:\n gsutil help %s' % ( | |
479 self.command_spec.usage_synopsis, self.command_name) | |
480 raise CommandException(message) | |
481 | |
482 def RaiseInvalidArgumentException(self): | |
483 """Raises exception for specifying an invalid argument to command.""" | |
484 message = ('Incorrect option(s) specified. Usage:\n%s\n' | |
485 'For additional help run:\n gsutil help %s' % ( | |
486 self.command_spec.usage_synopsis, self.command_name)) | |
487 raise CommandException(message) | |
488 | |
489 def ParseSubOpts(self, check_args=False): | |
490 """Parses sub-opt args. | |
491 | |
492 Args: | |
493 check_args: True to have CheckArguments() called after parsing. | |
494 | |
495 Populates: | |
496 (self.sub_opts, self.args) from parsing. | |
497 | |
498 Raises: RaiseInvalidArgumentException if invalid args specified. | |
499 """ | |
500 try: | |
501 self.sub_opts, self.args = getopt.getopt( | |
502 self.args, self.command_spec.supported_sub_args, | |
503 self.command_spec.supported_private_args or []) | |
504 except getopt.GetoptError: | |
505 self.RaiseInvalidArgumentException() | |
506 if check_args: | |
507 self.CheckArguments() | |
508 | |
509 def CheckArguments(self): | |
510 """Checks that command line arguments match the command_spec. | |
511 | |
512 Any commands in self._commands_with_subcommands_and_subopts are responsible | |
513 for calling this method after handling initial parsing of their arguments. | |
514 This prevents commands with sub-commands as well as options from breaking | |
515 the parsing of getopt. | |
516 | |
517 TODO: Provide a function to parse commands and sub-commands more | |
518 intelligently once we stop allowing the deprecated command versions. | |
519 | |
520 Raises: | |
521 CommandException if the arguments don't match. | |
522 """ | |
523 | |
524 if (not self.command_spec.file_url_ok | |
525 and HaveFileUrls(self.args[self.command_spec.urls_start_arg:])): | |
526 raise CommandException('"%s" command does not support "file://" URLs. ' | |
527 'Did you mean to use a gs:// URL?' % | |
528 self.command_name) | |
529 if (not self.command_spec.provider_url_ok | |
530 and HaveProviderUrls(self.args[self.command_spec.urls_start_arg:])): | |
531 raise CommandException('"%s" command does not support provider-only ' | |
532 'URLs.' % self.command_name) | |
533 | |
534 def WildcardIterator(self, url_string, all_versions=False): | |
535 """Helper to instantiate gslib.WildcardIterator. | |
536 | |
537 Args are same as gslib.WildcardIterator interface, but this method fills in | |
538 most of the values from instance state. | |
539 | |
540 Args: | |
541 url_string: URL string naming wildcard objects to iterate. | |
542 all_versions: If true, the iterator yields all versions of objects | |
543 matching the wildcard. If false, yields just the live | |
544 object version. | |
545 | |
546 Returns: | |
547 WildcardIterator for use by caller. | |
548 """ | |
549 return CreateWildcardIterator( | |
550 url_string, self.gsutil_api, all_versions=all_versions, | |
551 debug=self.debug, project_id=self.project_id) | |
552 | |
553 def RunCommand(self): | |
554 """Abstract function in base class. Subclasses must implement this. | |
555 | |
556 The return value of this function will be used as the exit status of the | |
557 process, so subclass commands should return an integer exit code (0 for | |
558 success, a value in [1,255] for failure). | |
559 """ | |
560 raise CommandException('Command %s is missing its RunCommand() ' | |
561 'implementation' % self.command_name) | |
562 | |
563 ############################################################ | |
564 # Shared helper functions that depend on base class state. # | |
565 ############################################################ | |
566 | |
567 def ApplyAclFunc(self, acl_func, acl_excep_handler, url_strs): | |
568 """Sets the standard or default object ACL depending on self.command_name. | |
569 | |
570 Args: | |
571 acl_func: ACL function to be passed to Apply. | |
572 acl_excep_handler: ACL exception handler to be passed to Apply. | |
573 url_strs: URL strings on which to set ACL. | |
574 | |
575 Raises: | |
576 CommandException if an ACL could not be set. | |
577 """ | |
578 multi_threaded_url_args = [] | |
579 # Handle bucket ACL setting operations single-threaded, because | |
580 # our threading machinery currently assumes it's working with objects | |
581 # (name_expansion_iterator), and normally we wouldn't expect users to need | |
582 # to set ACLs on huge numbers of buckets at once anyway. | |
583 for url_str in url_strs: | |
584 url = StorageUrlFromString(url_str) | |
585 if url.IsCloudUrl() and url.IsBucket(): | |
586 if self.recursion_requested: | |
587 # If user specified -R option, convert any bucket args to bucket | |
588 # wildcards (e.g., gs://bucket/*), to prevent the operation from | |
589 # being applied to the buckets themselves. | |
590 url.object_name = '*' | |
591 multi_threaded_url_args.append(url.url_string) | |
592 else: | |
593 # Convert to a NameExpansionResult so we can re-use the threaded | |
594 # function for the single-threaded implementation. RefType is unused. | |
595 for blr in self.WildcardIterator(url.url_string).IterBuckets( | |
596 bucket_fields=['id']): | |
597 name_expansion_for_url = NameExpansionResult( | |
598 url, False, False, blr.storage_url) | |
599 acl_func(self, name_expansion_for_url) | |
600 else: | |
601 multi_threaded_url_args.append(url_str) | |
602 | |
603 if len(multi_threaded_url_args) >= 1: | |
604 name_expansion_iterator = NameExpansionIterator( | |
605 self.command_name, self.debug, | |
606 self.logger, self.gsutil_api, | |
607 multi_threaded_url_args, self.recursion_requested, | |
608 all_versions=self.all_versions, | |
609 continue_on_error=self.continue_on_error or self.parallel_operations) | |
610 | |
611 # Perform requests in parallel (-m) mode, if requested, using | |
612 # configured number of parallel processes and threads. Otherwise, | |
613 # perform requests with sequential function calls in current process. | |
614 self.Apply(acl_func, name_expansion_iterator, acl_excep_handler, | |
615 fail_on_error=not self.continue_on_error) | |
616 | |
617 if not self.everything_set_okay and not self.continue_on_error: | |
618 raise CommandException('ACLs for some objects could not be set.') | |
619 | |
620 def SetAclFunc(self, name_expansion_result, thread_state=None): | |
621 """Sets the object ACL for the name_expansion_result provided. | |
622 | |
623 Args: | |
624 name_expansion_result: NameExpansionResult describing the target object. | |
625 thread_state: If present, use this gsutil Cloud API instance for the set. | |
626 """ | |
627 if thread_state: | |
628 assert not self.def_acl | |
629 gsutil_api = thread_state | |
630 else: | |
631 gsutil_api = self.gsutil_api | |
632 op_string = 'default object ACL' if self.def_acl else 'ACL' | |
633 url = name_expansion_result.expanded_storage_url | |
634 self.logger.info('Setting %s on %s...', op_string, url) | |
635 if (gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML | |
636 and url.scheme != 'gs'): | |
637 # If we are called with a non-google ACL model, we need to use the XML | |
638 # passthrough. acl_arg should either be a canned ACL or an XML ACL. | |
639 self._SetAclXmlPassthrough(url, gsutil_api) | |
640 else: | |
641 # Normal Cloud API path. acl_arg is a JSON ACL or a canned ACL. | |
642 self._SetAclGsutilApi(url, gsutil_api) | |
643 | |
644 def _SetAclXmlPassthrough(self, url, gsutil_api): | |
645 """Sets the ACL for the URL provided using the XML passthrough functions. | |
646 | |
647 This function assumes that self.def_acl, self.canned, | |
648 and self.continue_on_error are initialized, and that self.acl_arg is | |
649 either an XML string or a canned ACL string. | |
650 | |
651 Args: | |
652 url: CloudURL to set the ACL on. | |
653 gsutil_api: gsutil Cloud API to use for the ACL set. Must support XML | |
654 passthrough functions. | |
655 """ | |
656 try: | |
657 orig_prefer_api = gsutil_api.prefer_api | |
658 gsutil_api.prefer_api = ApiSelector.XML | |
659 gsutil_api.XmlPassThroughSetAcl( | |
660 self.acl_arg, url, canned=self.canned, | |
661 def_obj_acl=self.def_acl, provider=url.scheme) | |
662 except ServiceException as e: | |
663 if self.continue_on_error: | |
664 self.everything_set_okay = False | |
665 self.logger.error(e) | |
666 else: | |
667 raise | |
668 finally: | |
669 gsutil_api.prefer_api = orig_prefer_api | |
670 | |
671 def _SetAclGsutilApi(self, url, gsutil_api): | |
672 """Sets the ACL for the URL provided using the gsutil Cloud API. | |
673 | |
674 This function assumes that self.def_acl, self.canned, | |
675 and self.continue_on_error are initialized, and that self.acl_arg is | |
676 either a JSON string or a canned ACL string. | |
677 | |
678 Args: | |
679 url: CloudURL to set the ACL on. | |
680 gsutil_api: gsutil Cloud API to use for the ACL set. | |
681 """ | |
682 try: | |
683 if url.IsBucket(): | |
684 if self.def_acl: | |
685 if self.canned: | |
686 gsutil_api.PatchBucket( | |
687 url.bucket_name, apitools_messages.Bucket(), | |
688 canned_def_acl=self.acl_arg, provider=url.scheme, fields=['id']) | |
689 else: | |
690 def_obj_acl = AclTranslation.JsonToMessage( | |
691 self.acl_arg, apitools_messages.ObjectAccessControl) | |
692 bucket_metadata = apitools_messages.Bucket( | |
693 defaultObjectAcl=def_obj_acl) | |
694 gsutil_api.PatchBucket(url.bucket_name, bucket_metadata, | |
695 provider=url.scheme, fields=['id']) | |
696 else: | |
697 if self.canned: | |
698 gsutil_api.PatchBucket( | |
699 url.bucket_name, apitools_messages.Bucket(), | |
700 canned_acl=self.acl_arg, provider=url.scheme, fields=['id']) | |
701 else: | |
702 bucket_acl = AclTranslation.JsonToMessage( | |
703 self.acl_arg, apitools_messages.BucketAccessControl) | |
704 bucket_metadata = apitools_messages.Bucket(acl=bucket_acl) | |
705 gsutil_api.PatchBucket(url.bucket_name, bucket_metadata, | |
706 provider=url.scheme, fields=['id']) | |
707 else: # url.IsObject() | |
708 if self.canned: | |
709 gsutil_api.PatchObjectMetadata( | |
710 url.bucket_name, url.object_name, apitools_messages.Object(), | |
711 provider=url.scheme, generation=url.generation, | |
712 canned_acl=self.acl_arg) | |
713 else: | |
714 object_acl = AclTranslation.JsonToMessage( | |
715 self.acl_arg, apitools_messages.ObjectAccessControl) | |
716 object_metadata = apitools_messages.Object(acl=object_acl) | |
717 gsutil_api.PatchObjectMetadata(url.bucket_name, url.object_name, | |
718 object_metadata, provider=url.scheme, | |
719 generation=url.generation) | |
720 except ArgumentException, e: | |
721 raise | |
722 except ServiceException, e: | |
723 if self.continue_on_error: | |
724 self.everything_set_okay = False | |
725 self.logger.error(e) | |
726 else: | |
727 raise | |
728 | |
729 def SetAclCommandHelper(self, acl_func, acl_excep_handler): | |
730 """Sets ACLs on the self.args using the passed-in acl function. | |
731 | |
732 Args: | |
733 acl_func: ACL function to be passed to Apply. | |
734 acl_excep_handler: ACL exception handler to be passed to Apply. | |
735 """ | |
736 acl_arg = self.args[0] | |
737 url_args = self.args[1:] | |
738 # Disallow multi-provider setacl requests, because there are differences in | |
739 # the ACL models. | |
740 if not UrlsAreForSingleProvider(url_args): | |
741 raise CommandException('"%s" command spanning providers not allowed.' % | |
742 self.command_name) | |
743 | |
744 # Determine whether acl_arg names a file containing XML ACL text vs. the | |
745 # string name of a canned ACL. | |
746 if os.path.isfile(acl_arg): | |
747 with codecs.open(acl_arg, 'r', UTF8) as f: | |
748 acl_arg = f.read() | |
749 self.canned = False | |
750 else: | |
751 # No file exists, so expect a canned ACL string. | |
752 # Canned ACLs are not supported in JSON and we need to use the XML API | |
753 # to set them. | |
754 # validate=False because we allow wildcard urls. | |
755 storage_uri = boto.storage_uri( | |
756 url_args[0], debug=self.debug, validate=False, | |
757 bucket_storage_uri_class=self.bucket_storage_uri_class) | |
758 | |
759 canned_acls = storage_uri.canned_acls() | |
760 if acl_arg not in canned_acls: | |
761 raise CommandException('Invalid canned ACL "%s".' % acl_arg) | |
762 self.canned = True | |
763 | |
764 # Used to track if any ACLs failed to be set. | |
765 self.everything_set_okay = True | |
766 self.acl_arg = acl_arg | |
767 | |
768 self.ApplyAclFunc(acl_func, acl_excep_handler, url_args) | |
769 if not self.everything_set_okay and not self.continue_on_error: | |
770 raise CommandException('ACLs for some objects could not be set.') | |
771 | |
772 def _WarnServiceAccounts(self): | |
773 """Warns service account users who have received an AccessDenied error. | |
774 | |
775 When one of the metadata-related commands fails due to AccessDenied, user | |
776 must ensure that they are listed as an Owner in the API console. | |
777 """ | |
778 # Import this here so that the value will be set first in | |
779 # gcs_oauth2_boto_plugin. | |
780 # pylint: disable=g-import-not-at-top | |
781 from gcs_oauth2_boto_plugin.oauth2_plugin import IS_SERVICE_ACCOUNT | |
782 | |
783 if IS_SERVICE_ACCOUNT: | |
784 # This method is only called when canned ACLs are used, so the warning | |
785 # definitely applies. | |
786 self.logger.warning('\n'.join(textwrap.wrap( | |
787 'It appears that your service account has been denied access while ' | |
788 'attempting to perform a metadata operation. If you believe that you ' | |
789 'should have access to this metadata (i.e., if it is associated with ' | |
790 'your account), please make sure that your service account''s email ' | |
791 'address is listed as an Owner in the Team tab of the API console. ' | |
792 'See "gsutil help creds" for further information.\n'))) | |
793 | |
794 def GetAndPrintAcl(self, url_str): | |
795 """Prints the standard or default object ACL depending on self.command_name. | |
796 | |
797 Args: | |
798 url_str: URL string to get ACL for. | |
799 """ | |
800 blr = self.GetAclCommandBucketListingReference(url_str) | |
801 url = StorageUrlFromString(url_str) | |
802 if (self.gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML | |
803 and url.scheme != 'gs'): | |
804 # Need to use XML passthrough. | |
805 try: | |
806 acl = self.gsutil_api.XmlPassThroughGetAcl( | |
807 url, def_obj_acl=self.def_acl, provider=url.scheme) | |
808 print acl.to_xml() | |
809 except AccessDeniedException, _: | |
810 self._WarnServiceAccounts() | |
811 raise | |
812 else: | |
813 if self.command_name == 'defacl': | |
814 acl = blr.root_object.defaultObjectAcl | |
815 if not acl: | |
816 self.logger.warn( | |
817 'No default object ACL present for %s. This could occur if ' | |
818 'the default object ACL is private, in which case objects ' | |
819 'created in this bucket will be readable only by their ' | |
820 'creators. It could also mean you do not have OWNER permission ' | |
821 'on %s and therefore do not have permission to read the ' | |
822 'default object ACL.', url_str, url_str) | |
823 else: | |
824 acl = blr.root_object.acl | |
825 if not acl: | |
826 self._WarnServiceAccounts() | |
827 raise AccessDeniedException('Access denied. Please ensure you have ' | |
828 'OWNER permission on %s.' % url_str) | |
829 print AclTranslation.JsonFromMessage(acl) | |
830 | |
831 def GetAclCommandBucketListingReference(self, url_str): | |
832 """Gets a single bucket listing reference for an acl get command. | |
833 | |
834 Args: | |
835 url_str: URL string to get the bucket listing reference for. | |
836 | |
837 Returns: | |
838 BucketListingReference for the URL string. | |
839 | |
840 Raises: | |
841 CommandException if string did not result in exactly one reference. | |
842 """ | |
843 # We're guaranteed by caller that we have the appropriate type of url | |
844 # string for the call (ex. we will never be called with an object string | |
845 # by getdefacl) | |
846 wildcard_url = StorageUrlFromString(url_str) | |
847 if wildcard_url.IsObject(): | |
848 plurality_iter = PluralityCheckableIterator( | |
849 self.WildcardIterator(url_str).IterObjects( | |
850 bucket_listing_fields=['acl'])) | |
851 else: | |
852 # Bucket or provider. We call IterBuckets explicitly here to ensure that | |
853 # the root object is populated with the acl. | |
854 if self.command_name == 'defacl': | |
855 bucket_fields = ['defaultObjectAcl'] | |
856 else: | |
857 bucket_fields = ['acl'] | |
858 plurality_iter = PluralityCheckableIterator( | |
859 self.WildcardIterator(url_str).IterBuckets( | |
860 bucket_fields=bucket_fields)) | |
861 if plurality_iter.IsEmpty(): | |
862 raise CommandException('No URLs matched') | |
863 if plurality_iter.HasPlurality(): | |
864 raise CommandException( | |
865 '%s matched more than one URL, which is not allowed by the %s ' | |
866 'command' % (url_str, self.command_name)) | |
867 return list(plurality_iter)[0] | |
868 | |
869 def _HandleMultiProcessingSigs(self, unused_signal_num, | |
870 unused_cur_stack_frame): | |
871 """Handles signals INT AND TERM during a multi-process/multi-thread request. | |
872 | |
873 Kills subprocesses. | |
874 | |
875 Args: | |
876 unused_signal_num: signal generated by ^C. | |
877 unused_cur_stack_frame: Current stack frame. | |
878 """ | |
879 # Note: This only works under Linux/MacOS. See | |
880 # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details | |
881 # about why making it work correctly across OS's is harder and still open. | |
882 ShutDownGsutil() | |
883 sys.stderr.write('Caught ^C - exiting\n') | |
884 # Simply calling sys.exit(1) doesn't work - see above bug for details. | |
885 KillProcess(os.getpid()) | |
886 | |
887 def GetSingleBucketUrlFromArg(self, arg, bucket_fields=None): | |
888 """Gets a single bucket URL based on the command arguments. | |
889 | |
890 Args: | |
891 arg: String argument to get bucket URL for. | |
892 bucket_fields: Fields to populate for the bucket. | |
893 | |
894 Returns: | |
895 (StorageUrl referring to a single bucket, Bucket metadata). | |
896 | |
897 Raises: | |
898 CommandException if args did not match exactly one bucket. | |
899 """ | |
900 plurality_checkable_iterator = self.GetBucketUrlIterFromArg( | |
901 arg, bucket_fields=bucket_fields) | |
902 if plurality_checkable_iterator.HasPlurality(): | |
903 raise CommandException( | |
904 '%s matched more than one URL, which is not\n' | |
905 'allowed by the %s command' % (arg, self.command_name)) | |
906 blr = list(plurality_checkable_iterator)[0] | |
907 return StorageUrlFromString(blr.url_string), blr.root_object | |
908 | |
909 def GetBucketUrlIterFromArg(self, arg, bucket_fields=None): | |
910 """Gets a single bucket URL based on the command arguments. | |
911 | |
912 Args: | |
913 arg: String argument to iterate over. | |
914 bucket_fields: Fields to populate for the bucket. | |
915 | |
916 Returns: | |
917 PluralityCheckableIterator over buckets. | |
918 | |
919 Raises: | |
920 CommandException if iterator matched no buckets. | |
921 """ | |
922 arg_url = StorageUrlFromString(arg) | |
923 if not arg_url.IsCloudUrl() or arg_url.IsObject(): | |
924 raise CommandException('"%s" command must specify a bucket' % | |
925 self.command_name) | |
926 | |
927 plurality_checkable_iterator = PluralityCheckableIterator( | |
928 self.WildcardIterator(arg).IterBuckets( | |
929 bucket_fields=bucket_fields)) | |
930 if plurality_checkable_iterator.IsEmpty(): | |
931 raise CommandException('No URLs matched') | |
932 return plurality_checkable_iterator | |
933 | |
934 ###################### | |
935 # Private functions. # | |
936 ###################### | |
937 | |
938 def _ResetConnectionPool(self): | |
939 # Each OS process needs to establish its own set of connections to | |
940 # the server to avoid writes from different OS processes interleaving | |
941 # onto the same socket (and garbling the underlying SSL session). | |
942 # We ensure each process gets its own set of connections here by | |
943 # closing all connections in the storage provider connection pool. | |
944 connection_pool = StorageUri.provider_pool | |
945 if connection_pool: | |
946 for i in connection_pool: | |
947 connection_pool[i].connection.close() | |
948 | |
949 def _GetProcessAndThreadCount(self, process_count, thread_count, | |
950 parallel_operations_override): | |
951 """Determines the values of process_count and thread_count. | |
952 | |
953 These values are used for parallel operations. | |
954 If we're not performing operations in parallel, then ignore | |
955 existing values and use process_count = thread_count = 1. | |
956 | |
957 Args: | |
958 process_count: A positive integer or None. In the latter case, we read | |
959 the value from the .boto config file. | |
960 thread_count: A positive integer or None. In the latter case, we read | |
961 the value from the .boto config file. | |
962 parallel_operations_override: Used to override self.parallel_operations. | |
963 This allows the caller to safely override | |
964 the top-level flag for a single call. | |
965 | |
966 Returns: | |
967 (process_count, thread_count): The number of processes and threads to use, | |
968 respectively. | |
969 """ | |
970 # Set OS process and python thread count as a function of options | |
971 # and config. | |
972 if self.parallel_operations or parallel_operations_override: | |
973 if not process_count: | |
974 process_count = boto.config.getint( | |
975 'GSUtil', 'parallel_process_count', | |
976 gslib.commands.config.DEFAULT_PARALLEL_PROCESS_COUNT) | |
977 if process_count < 1: | |
978 raise CommandException('Invalid parallel_process_count "%d".' % | |
979 process_count) | |
980 if not thread_count: | |
981 thread_count = boto.config.getint( | |
982 'GSUtil', 'parallel_thread_count', | |
983 gslib.commands.config.DEFAULT_PARALLEL_THREAD_COUNT) | |
984 if thread_count < 1: | |
985 raise CommandException('Invalid parallel_thread_count "%d".' % | |
986 thread_count) | |
987 else: | |
988 # If -m not specified, then assume 1 OS process and 1 Python thread. | |
989 process_count = 1 | |
990 thread_count = 1 | |
991 | |
992 if IS_WINDOWS and process_count > 1: | |
993 raise CommandException('\n'.join(textwrap.wrap( | |
994 ('It is not possible to set process_count > 1 on Windows. Please ' | |
995 'update your config file (located at %s) and set ' | |
996 '"parallel_process_count = 1".') % | |
997 GetConfigFilePath()))) | |
998 self.logger.debug('process count: %d', process_count) | |
999 self.logger.debug('thread count: %d', thread_count) | |
1000 | |
1001 return (process_count, thread_count) | |
1002 | |
1003 def _SetUpPerCallerState(self): | |
1004 """Set up the state for a caller id, corresponding to one Apply call.""" | |
1005 # Get a new caller ID. | |
1006 with caller_id_lock: | |
1007 caller_id_counter.value += 1 | |
1008 caller_id = caller_id_counter.value | |
1009 | |
1010 # Create a copy of self with an incremented recursive level. This allows | |
1011 # the class to report its level correctly if the function called from it | |
1012 # also needs to call Apply. | |
1013 cls = copy.copy(self) | |
1014 cls.recursive_apply_level += 1 | |
1015 | |
1016 # Thread-safe loggers can't be pickled, so we will remove it here and | |
1017 # recreate it later in the WorkerThread. This is not a problem since any | |
1018 # logger with the same name will be treated as a singleton. | |
1019 cls.logger = None | |
1020 | |
1021 # Likewise, the default API connection can't be pickled, but it is unused | |
1022 # anyway as each thread gets its own API delegator. | |
1023 cls.gsutil_api = None | |
1024 | |
1025 class_map[caller_id] = cls | |
1026 total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet. | |
1027 call_completed_map[caller_id] = False | |
1028 caller_id_finished_count.Put(caller_id, 0) | |
1029 global_return_values_map.Put(caller_id, []) | |
1030 return caller_id | |
1031 | |
1032 def _CreateNewConsumerPool(self, num_processes, num_threads): | |
1033 """Create a new pool of processes that call _ApplyThreads.""" | |
1034 processes = [] | |
1035 task_queue = _NewMultiprocessingQueue() | |
1036 task_queues.append(task_queue) | |
1037 | |
1038 current_max_recursive_level.value += 1 | |
1039 if current_max_recursive_level.value > MAX_RECURSIVE_DEPTH: | |
1040 raise CommandException('Recursion depth of Apply calls is too great.') | |
1041 for _ in range(num_processes): | |
1042 recursive_apply_level = len(consumer_pools) | |
1043 p = multiprocessing.Process( | |
1044 target=self._ApplyThreads, | |
1045 args=(num_threads, num_processes, recursive_apply_level)) | |
1046 p.daemon = True | |
1047 processes.append(p) | |
1048 p.start() | |
1049 consumer_pool = _ConsumerPool(processes, task_queue) | |
1050 consumer_pools.append(consumer_pool) | |
1051 | |
1052 def Apply(self, func, args_iterator, exception_handler, | |
1053 shared_attrs=None, arg_checker=_UrlArgChecker, | |
1054 parallel_operations_override=False, process_count=None, | |
1055 thread_count=None, should_return_results=False, | |
1056 fail_on_error=False): | |
1057 """Calls _Parallel/SequentialApply based on multiprocessing availability. | |
1058 | |
1059 Args: | |
1060 func: Function to call to process each argument. | |
1061 args_iterator: Iterable collection of arguments to be put into the | |
1062 work queue. | |
1063 exception_handler: Exception handler for WorkerThread class. | |
1064 shared_attrs: List of attributes to manage across sub-processes. | |
1065 arg_checker: Used to determine whether we should process the current | |
1066 argument or simply skip it. Also handles any logging that | |
1067 is specific to a particular type of argument. | |
1068 parallel_operations_override: Used to override self.parallel_operations. | |
1069 This allows the caller to safely override | |
1070 the top-level flag for a single call. | |
1071 process_count: The number of processes to use. If not specified, then | |
1072 the configured default will be used. | |
1073 thread_count: The number of threads per process. If not speficied, then | |
1074 the configured default will be used.. | |
1075 should_return_results: If true, then return the results of all successful | |
1076 calls to func in a list. | |
1077 fail_on_error: If true, then raise any exceptions encountered when | |
1078 executing func. This is only applicable in the case of | |
1079 process_count == thread_count == 1. | |
1080 | |
1081 Returns: | |
1082 Results from spawned threads. | |
1083 """ | |
1084 if shared_attrs: | |
1085 original_shared_vars_values = {} # We'll add these back in at the end. | |
1086 for name in shared_attrs: | |
1087 original_shared_vars_values[name] = getattr(self, name) | |
1088 # By setting this to 0, we simplify the logic for computing deltas. | |
1089 # We'll add it back after all of the tasks have been performed. | |
1090 setattr(self, name, 0) | |
1091 | |
1092 (process_count, thread_count) = self._GetProcessAndThreadCount( | |
1093 process_count, thread_count, parallel_operations_override) | |
1094 | |
1095 is_main_thread = (self.recursive_apply_level == 0 | |
1096 and self.sequential_caller_id == -1) | |
1097 | |
1098 # We don't honor the fail_on_error flag in the case of multiple threads | |
1099 # or processes. | |
1100 fail_on_error = fail_on_error and (process_count * thread_count == 1) | |
1101 | |
1102 # Only check this from the first call in the main thread. Apart from the | |
1103 # fact that it's wasteful to try this multiple times in general, it also | |
1104 # will never work when called from a subprocess since we use daemon | |
1105 # processes, and daemons can't create other processes. | |
1106 if is_main_thread: | |
1107 if ((not self.multiprocessing_is_available) | |
1108 and thread_count * process_count > 1): | |
1109 # Run the check again and log the appropriate warnings. This was run | |
1110 # before, when the Command object was created, in order to calculate | |
1111 # self.multiprocessing_is_available, but we don't want to print the | |
1112 # warning until we're sure the user actually tried to use multiple | |
1113 # threads or processes. | |
1114 MultiprocessingIsAvailable(logger=self.logger) | |
1115 | |
1116 if self.multiprocessing_is_available: | |
1117 caller_id = self._SetUpPerCallerState() | |
1118 else: | |
1119 self.sequential_caller_id += 1 | |
1120 caller_id = self.sequential_caller_id | |
1121 | |
1122 if is_main_thread: | |
1123 # pylint: disable=global-variable-undefined | |
1124 global global_return_values_map, shared_vars_map, failure_count | |
1125 global caller_id_finished_count, shared_vars_list_map | |
1126 global_return_values_map = BasicIncrementDict() | |
1127 global_return_values_map.Put(caller_id, []) | |
1128 shared_vars_map = BasicIncrementDict() | |
1129 caller_id_finished_count = BasicIncrementDict() | |
1130 shared_vars_list_map = {} | |
1131 failure_count = 0 | |
1132 | |
1133 # If any shared attributes passed by caller, create a dictionary of | |
1134 # shared memory variables for every element in the list of shared | |
1135 # attributes. | |
1136 if shared_attrs: | |
1137 shared_vars_list_map[caller_id] = shared_attrs | |
1138 for name in shared_attrs: | |
1139 shared_vars_map.Put((caller_id, name), 0) | |
1140 | |
1141 # Make all of the requested function calls. | |
1142 if self.multiprocessing_is_available and thread_count * process_count > 1: | |
1143 self._ParallelApply(func, args_iterator, exception_handler, caller_id, | |
1144 arg_checker, process_count, thread_count, | |
1145 should_return_results, fail_on_error) | |
1146 else: | |
1147 self._SequentialApply(func, args_iterator, exception_handler, caller_id, | |
1148 arg_checker, should_return_results, fail_on_error) | |
1149 | |
1150 if shared_attrs: | |
1151 for name in shared_attrs: | |
1152 # This allows us to retain the original value of the shared variable, | |
1153 # and simply apply the delta after what was done during the call to | |
1154 # apply. | |
1155 final_value = (original_shared_vars_values[name] + | |
1156 shared_vars_map.Get((caller_id, name))) | |
1157 setattr(self, name, final_value) | |
1158 | |
1159 if should_return_results: | |
1160 return global_return_values_map.Get(caller_id) | |
1161 | |
1162 def _MaybeSuggestGsutilDashM(self): | |
1163 """Outputs a sugestion to the user to use gsutil -m.""" | |
1164 if not (boto.config.getint('GSUtil', 'parallel_process_count', 0) == 1 and | |
1165 boto.config.getint('GSUtil', 'parallel_thread_count', 0) == 1): | |
1166 self.logger.info('\n' + textwrap.fill( | |
1167 '==> NOTE: You are performing a sequence of gsutil operations that ' | |
1168 'may run significantly faster if you instead use gsutil -m %s ...\n' | |
1169 'Please see the -m section under "gsutil help options" for further ' | |
1170 'information about when gsutil -m can be advantageous.' | |
1171 % sys.argv[1]) + '\n') | |
1172 | |
1173 # pylint: disable=g-doc-args | |
1174 def _SequentialApply(self, func, args_iterator, exception_handler, caller_id, | |
1175 arg_checker, should_return_results, fail_on_error): | |
1176 """Performs all function calls sequentially in the current thread. | |
1177 | |
1178 No other threads or processes will be spawned. This degraded functionality | |
1179 is used when the multiprocessing module is not available or the user | |
1180 requests only one thread and one process. | |
1181 """ | |
1182 # Create a WorkerThread to handle all of the logic needed to actually call | |
1183 # the function. Note that this thread will never be started, and all work | |
1184 # is done in the current thread. | |
1185 worker_thread = WorkerThread(None, False) | |
1186 args_iterator = iter(args_iterator) | |
1187 # Count of sequential calls that have been made. Used for producing | |
1188 # suggestion to use gsutil -m. | |
1189 sequential_call_count = 0 | |
1190 while True: | |
1191 | |
1192 # Try to get the next argument, handling any exceptions that arise. | |
1193 try: | |
1194 args = args_iterator.next() | |
1195 except StopIteration, e: | |
1196 break | |
1197 except Exception, e: # pylint: disable=broad-except | |
1198 _IncrementFailureCount() | |
1199 if fail_on_error: | |
1200 raise | |
1201 else: | |
1202 try: | |
1203 exception_handler(self, e) | |
1204 except Exception, _: # pylint: disable=broad-except | |
1205 self.logger.debug( | |
1206 'Caught exception while handling exception for %s:\n%s', | |
1207 func, traceback.format_exc()) | |
1208 continue | |
1209 | |
1210 sequential_call_count += 1 | |
1211 if sequential_call_count == OFFER_GSUTIL_M_SUGGESTION_THRESHOLD: | |
1212 # Output suggestion near beginning of run, so user sees it early and can | |
1213 # ^C and try gsutil -m. | |
1214 self._MaybeSuggestGsutilDashM() | |
1215 if arg_checker(self, args): | |
1216 # Now that we actually have the next argument, perform the task. | |
1217 task = Task(func, args, caller_id, exception_handler, | |
1218 should_return_results, arg_checker, fail_on_error) | |
1219 worker_thread.PerformTask(task, self) | |
1220 if sequential_call_count >= gslib.util.GetTermLines(): | |
1221 # Output suggestion at end of long run, in case user missed it at the | |
1222 # start and it scrolled off-screen. | |
1223 self._MaybeSuggestGsutilDashM() | |
1224 | |
1225 # pylint: disable=g-doc-args | |
1226 def _ParallelApply(self, func, args_iterator, exception_handler, caller_id, | |
1227 arg_checker, process_count, thread_count, | |
1228 should_return_results, fail_on_error): | |
1229 """Dispatches input arguments across a thread/process pool. | |
1230 | |
1231 Pools are composed of parallel OS processes and/or Python threads, | |
1232 based on options (-m or not) and settings in the user's config file. | |
1233 | |
1234 If only one OS process is requested/available, dispatch requests across | |
1235 threads in the current OS process. | |
1236 | |
1237 In the multi-process case, we will create one pool of worker processes for | |
1238 each level of the tree of recursive calls to Apply. E.g., if A calls | |
1239 Apply(B), and B ultimately calls Apply(C) followed by Apply(D), then we | |
1240 will only create two sets of worker processes - B will execute in the first, | |
1241 and C and D will execute in the second. If C is then changed to call | |
1242 Apply(E) and D is changed to call Apply(F), then we will automatically | |
1243 create a third set of processes (lazily, when needed) that will be used to | |
1244 execute calls to E and F. This might look something like: | |
1245 | |
1246 Pool1 Executes: B | |
1247 / \ | |
1248 Pool2 Executes: C D | |
1249 / \ | |
1250 Pool3 Executes: E F | |
1251 | |
1252 Apply's parallelism is generally broken up into 4 cases: | |
1253 - If process_count == thread_count == 1, then all tasks will be executed | |
1254 by _SequentialApply. | |
1255 - If process_count > 1 and thread_count == 1, then the main thread will | |
1256 create a new pool of processes (if they don't already exist) and each of | |
1257 those processes will execute the tasks in a single thread. | |
1258 - If process_count == 1 and thread_count > 1, then this process will create | |
1259 a new pool of threads to execute the tasks. | |
1260 - If process_count > 1 and thread_count > 1, then the main thread will | |
1261 create a new pool of processes (if they don't already exist) and each of | |
1262 those processes will, upon creation, create a pool of threads to | |
1263 execute the tasks. | |
1264 | |
1265 Args: | |
1266 caller_id: The caller ID unique to this call to command.Apply. | |
1267 See command.Apply for description of other arguments. | |
1268 """ | |
1269 is_main_thread = self.recursive_apply_level == 0 | |
1270 | |
1271 # Catch SIGINT and SIGTERM under Linux/MacOs so we can do cleanup before | |
1272 # exiting. | |
1273 if not IS_WINDOWS and is_main_thread: | |
1274 # Register as a final signal handler because this handler kills the | |
1275 # main gsutil process (so it must run last). | |
1276 RegisterSignalHandler(signal.SIGINT, self._HandleMultiProcessingSigs, | |
1277 is_final_handler=True) | |
1278 RegisterSignalHandler(signal.SIGTERM, self._HandleMultiProcessingSigs, | |
1279 is_final_handler=True) | |
1280 | |
1281 if not task_queues: | |
1282 # The process we create will need to access the next recursive level | |
1283 # of task queues if it makes a call to Apply, so we always keep around | |
1284 # one more queue than we know we need. OTOH, if we don't create a new | |
1285 # process, the existing process still needs a task queue to use. | |
1286 task_queues.append(_NewMultiprocessingQueue()) | |
1287 | |
1288 if process_count > 1: # Handle process pool creation. | |
1289 # Check whether this call will need a new set of workers. | |
1290 | |
1291 # Each worker must acquire a shared lock before notifying the main thread | |
1292 # that it needs a new worker pool, so that at most one worker asks for | |
1293 # a new worker pool at once. | |
1294 try: | |
1295 if not is_main_thread: | |
1296 worker_checking_level_lock.acquire() | |
1297 if self.recursive_apply_level >= current_max_recursive_level.value: | |
1298 with need_pool_or_done_cond: | |
1299 # Only the main thread is allowed to create new processes - | |
1300 # otherwise, we will run into some Python bugs. | |
1301 if is_main_thread: | |
1302 self._CreateNewConsumerPool(process_count, thread_count) | |
1303 else: | |
1304 # Notify the main thread that we need a new consumer pool. | |
1305 new_pool_needed.value = 1 | |
1306 need_pool_or_done_cond.notify_all() | |
1307 # The main thread will notify us when it finishes. | |
1308 need_pool_or_done_cond.wait() | |
1309 finally: | |
1310 if not is_main_thread: | |
1311 worker_checking_level_lock.release() | |
1312 | |
1313 # If we're running in this process, create a separate task queue. Otherwise, | |
1314 # if Apply has already been called with process_count > 1, then there will | |
1315 # be consumer pools trying to use our processes. | |
1316 if process_count > 1: | |
1317 task_queue = task_queues[self.recursive_apply_level] | |
1318 else: | |
1319 task_queue = _NewMultiprocessingQueue() | |
1320 | |
1321 # Kick off a producer thread to throw tasks in the global task queue. We | |
1322 # do this asynchronously so that the main thread can be free to create new | |
1323 # consumer pools when needed (otherwise, any thread with a task that needs | |
1324 # a new consumer pool must block until we're completely done producing; in | |
1325 # the worst case, every worker blocks on such a call and the producer fills | |
1326 # up the task queue before it finishes, so we block forever). | |
1327 producer_thread = ProducerThread(copy.copy(self), args_iterator, caller_id, | |
1328 func, task_queue, should_return_results, | |
1329 exception_handler, arg_checker, | |
1330 fail_on_error) | |
1331 | |
1332 if process_count > 1: | |
1333 # Wait here until either: | |
1334 # 1. We're the main thread and someone needs a new consumer pool - in | |
1335 # which case we create one and continue waiting. | |
1336 # 2. Someone notifies us that all of the work we requested is done, in | |
1337 # which case we retrieve the results (if applicable) and stop | |
1338 # waiting. | |
1339 while True: | |
1340 with need_pool_or_done_cond: | |
1341 # Either our call is done, or someone needs a new level of consumer | |
1342 # pools, or we the wakeup call was meant for someone else. It's | |
1343 # impossible for both conditions to be true, since the main thread is | |
1344 # blocked on any other ongoing calls to Apply, and a thread would not | |
1345 # ask for a new consumer pool unless it had more work to do. | |
1346 if call_completed_map[caller_id]: | |
1347 break | |
1348 elif is_main_thread and new_pool_needed.value: | |
1349 new_pool_needed.value = 0 | |
1350 self._CreateNewConsumerPool(process_count, thread_count) | |
1351 need_pool_or_done_cond.notify_all() | |
1352 | |
1353 # Note that we must check the above conditions before the wait() call; | |
1354 # otherwise, the notification can happen before we start waiting, in | |
1355 # which case we'll block forever. | |
1356 need_pool_or_done_cond.wait() | |
1357 else: # Using a single process. | |
1358 self._ApplyThreads(thread_count, process_count, | |
1359 self.recursive_apply_level, | |
1360 is_blocking_call=True, task_queue=task_queue) | |
1361 | |
1362 # We encountered an exception from the producer thread before any arguments | |
1363 # were enqueued, but it wouldn't have been propagated, so we'll now | |
1364 # explicitly raise it here. | |
1365 if producer_thread.unknown_exception: | |
1366 # pylint: disable=raising-bad-type | |
1367 raise producer_thread.unknown_exception | |
1368 | |
1369 # We encountered an exception from the producer thread while iterating over | |
1370 # the arguments, so raise it here if we're meant to fail on error. | |
1371 if producer_thread.iterator_exception and fail_on_error: | |
1372 # pylint: disable=raising-bad-type | |
1373 raise producer_thread.iterator_exception | |
1374 | |
1375 def _ApplyThreads(self, thread_count, process_count, recursive_apply_level, | |
1376 is_blocking_call=False, task_queue=None): | |
1377 """Assigns the work from the multi-process global task queue. | |
1378 | |
1379 Work is assigned to an individual process for later consumption either by | |
1380 the WorkerThreads or (if thread_count == 1) this thread. | |
1381 | |
1382 Args: | |
1383 thread_count: The number of threads used to perform the work. If 1, then | |
1384 perform all work in this thread. | |
1385 process_count: The number of processes used to perform the work. | |
1386 recursive_apply_level: The depth in the tree of recursive calls to Apply | |
1387 of this thread. | |
1388 is_blocking_call: True iff the call to Apply is blocked on this call | |
1389 (which is true iff process_count == 1), implying that | |
1390 _ApplyThreads must behave as a blocking call. | |
1391 """ | |
1392 self._ResetConnectionPool() | |
1393 self.recursive_apply_level = recursive_apply_level | |
1394 | |
1395 task_queue = task_queue or task_queues[recursive_apply_level] | |
1396 | |
1397 assert thread_count * process_count > 1, ( | |
1398 'Invalid state, calling command._ApplyThreads with only one thread ' | |
1399 'and process.') | |
1400 worker_pool = WorkerPool( | |
1401 thread_count, self.logger, | |
1402 bucket_storage_uri_class=self.bucket_storage_uri_class, | |
1403 gsutil_api_map=self.gsutil_api_map, debug=self.debug) | |
1404 | |
1405 num_enqueued = 0 | |
1406 while True: | |
1407 task = task_queue.get() | |
1408 if task.args != ZERO_TASKS_TO_DO_ARGUMENT: | |
1409 # If we have no tasks to do and we're performing a blocking call, we | |
1410 # need a special signal to tell us to stop - otherwise, we block on | |
1411 # the call to task_queue.get() forever. | |
1412 worker_pool.AddTask(task) | |
1413 num_enqueued += 1 | |
1414 | |
1415 if is_blocking_call: | |
1416 num_to_do = total_tasks[task.caller_id] | |
1417 # The producer thread won't enqueue the last task until after it has | |
1418 # updated total_tasks[caller_id], so we know that num_to_do < 0 implies | |
1419 # we will do this check again. | |
1420 if num_to_do >= 0 and num_enqueued == num_to_do: | |
1421 if thread_count == 1: | |
1422 return | |
1423 else: | |
1424 while True: | |
1425 with need_pool_or_done_cond: | |
1426 if call_completed_map[task.caller_id]: | |
1427 # We need to check this first, in case the condition was | |
1428 # notified before we grabbed the lock. | |
1429 return | |
1430 need_pool_or_done_cond.wait() | |
1431 | |
1432 | |
1433 # Below here lie classes and functions related to controlling the flow of tasks | |
1434 # between various threads and processes. | |
1435 | |
1436 | |
1437 class _ConsumerPool(object): | |
1438 | |
1439 def __init__(self, processes, task_queue): | |
1440 self.processes = processes | |
1441 self.task_queue = task_queue | |
1442 | |
1443 def ShutDown(self): | |
1444 for process in self.processes: | |
1445 KillProcess(process.pid) | |
1446 | |
1447 | |
1448 def KillProcess(pid): | |
1449 """Make best effort to kill the given process. | |
1450 | |
1451 We ignore all exceptions so a caller looping through a list of processes will | |
1452 continue attempting to kill each, even if one encounters a problem. | |
1453 | |
1454 Args: | |
1455 pid: The process ID. | |
1456 """ | |
1457 try: | |
1458 # os.kill doesn't work in 2.X or 3.Y on Windows for any X < 7 or Y < 2. | |
1459 if IS_WINDOWS and ((2, 6) <= sys.version_info[:3] < (2, 7) or | |
1460 (3, 0) <= sys.version_info[:3] < (3, 2)): | |
1461 kernel32 = ctypes.windll.kernel32 | |
1462 handle = kernel32.OpenProcess(1, 0, pid) | |
1463 kernel32.TerminateProcess(handle, 0) | |
1464 else: | |
1465 os.kill(pid, signal.SIGKILL) | |
1466 except: # pylint: disable=bare-except | |
1467 pass | |
1468 | |
1469 | |
1470 class Task(namedtuple('Task', ( | |
1471 'func args caller_id exception_handler should_return_results arg_checker ' | |
1472 'fail_on_error'))): | |
1473 """Task class representing work to be completed. | |
1474 | |
1475 Args: | |
1476 func: The function to be executed. | |
1477 args: The arguments to func. | |
1478 caller_id: The globally-unique caller ID corresponding to the Apply call. | |
1479 exception_handler: The exception handler to use if the call to func fails. | |
1480 should_return_results: True iff the results of this function should be | |
1481 returned from the Apply call. | |
1482 arg_checker: Used to determine whether we should process the current | |
1483 argument or simply skip it. Also handles any logging that | |
1484 is specific to a particular type of argument. | |
1485 fail_on_error: If true, then raise any exceptions encountered when | |
1486 executing func. This is only applicable in the case of | |
1487 process_count == thread_count == 1. | |
1488 """ | |
1489 pass | |
1490 | |
1491 | |
1492 class ProducerThread(threading.Thread): | |
1493 """Thread used to enqueue work for other processes and threads.""" | |
1494 | |
1495 def __init__(self, cls, args_iterator, caller_id, func, task_queue, | |
1496 should_return_results, exception_handler, arg_checker, | |
1497 fail_on_error): | |
1498 """Initializes the producer thread. | |
1499 | |
1500 Args: | |
1501 cls: Instance of Command for which this ProducerThread was created. | |
1502 args_iterator: Iterable collection of arguments to be put into the | |
1503 work queue. | |
1504 caller_id: Globally-unique caller ID corresponding to this call to Apply. | |
1505 func: The function to be called on each element of args_iterator. | |
1506 task_queue: The queue into which tasks will be put, to later be consumed | |
1507 by Command._ApplyThreads. | |
1508 should_return_results: True iff the results for this call to command.Apply | |
1509 were requested. | |
1510 exception_handler: The exception handler to use when errors are | |
1511 encountered during calls to func. | |
1512 arg_checker: Used to determine whether we should process the current | |
1513 argument or simply skip it. Also handles any logging that | |
1514 is specific to a particular type of argument. | |
1515 fail_on_error: If true, then raise any exceptions encountered when | |
1516 executing func. This is only applicable in the case of | |
1517 process_count == thread_count == 1. | |
1518 """ | |
1519 super(ProducerThread, self).__init__() | |
1520 self.func = func | |
1521 self.cls = cls | |
1522 self.args_iterator = args_iterator | |
1523 self.caller_id = caller_id | |
1524 self.task_queue = task_queue | |
1525 self.arg_checker = arg_checker | |
1526 self.exception_handler = exception_handler | |
1527 self.should_return_results = should_return_results | |
1528 self.fail_on_error = fail_on_error | |
1529 self.shared_variables_updater = _SharedVariablesUpdater() | |
1530 self.daemon = True | |
1531 self.unknown_exception = None | |
1532 self.iterator_exception = None | |
1533 self.start() | |
1534 | |
1535 def run(self): | |
1536 num_tasks = 0 | |
1537 cur_task = None | |
1538 last_task = None | |
1539 try: | |
1540 args_iterator = iter(self.args_iterator) | |
1541 while True: | |
1542 try: | |
1543 args = args_iterator.next() | |
1544 except StopIteration, e: | |
1545 break | |
1546 except Exception, e: # pylint: disable=broad-except | |
1547 _IncrementFailureCount() | |
1548 if self.fail_on_error: | |
1549 self.iterator_exception = e | |
1550 raise | |
1551 else: | |
1552 try: | |
1553 self.exception_handler(self.cls, e) | |
1554 except Exception, _: # pylint: disable=broad-except | |
1555 self.cls.logger.debug( | |
1556 'Caught exception while handling exception for %s:\n%s', | |
1557 self.func, traceback.format_exc()) | |
1558 self.shared_variables_updater.Update(self.caller_id, self.cls) | |
1559 continue | |
1560 | |
1561 if self.arg_checker(self.cls, args): | |
1562 num_tasks += 1 | |
1563 last_task = cur_task | |
1564 cur_task = Task(self.func, args, self.caller_id, | |
1565 self.exception_handler, self.should_return_results, | |
1566 self.arg_checker, self.fail_on_error) | |
1567 if last_task: | |
1568 self.task_queue.put(last_task) | |
1569 except Exception, e: # pylint: disable=broad-except | |
1570 # This will also catch any exception raised due to an error in the | |
1571 # iterator when fail_on_error is set, so check that we failed for some | |
1572 # other reason before claiming that we had an unknown exception. | |
1573 if not self.iterator_exception: | |
1574 self.unknown_exception = e | |
1575 finally: | |
1576 # We need to make sure to update total_tasks[caller_id] before we enqueue | |
1577 # the last task. Otherwise, a worker can retrieve the last task and | |
1578 # complete it, then check total_tasks and determine that we're not done | |
1579 # producing all before we update total_tasks. This approach forces workers | |
1580 # to wait on the last task until after we've updated total_tasks. | |
1581 total_tasks[self.caller_id] = num_tasks | |
1582 if not cur_task: | |
1583 # This happens if there were zero arguments to be put in the queue. | |
1584 cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id, | |
1585 None, None, None, None) | |
1586 self.task_queue.put(cur_task) | |
1587 | |
1588 # It's possible that the workers finished before we updated total_tasks, | |
1589 # so we need to check here as well. | |
1590 _NotifyIfDone(self.caller_id, | |
1591 caller_id_finished_count.Get(self.caller_id)) | |
1592 | |
1593 | |
1594 class WorkerPool(object): | |
1595 """Pool of worker threads to which tasks can be added.""" | |
1596 | |
1597 def __init__(self, thread_count, logger, bucket_storage_uri_class=None, | |
1598 gsutil_api_map=None, debug=0): | |
1599 self.task_queue = _NewThreadsafeQueue() | |
1600 self.threads = [] | |
1601 for _ in range(thread_count): | |
1602 worker_thread = WorkerThread( | |
1603 self.task_queue, logger, | |
1604 bucket_storage_uri_class=bucket_storage_uri_class, | |
1605 gsutil_api_map=gsutil_api_map, debug=debug) | |
1606 self.threads.append(worker_thread) | |
1607 worker_thread.start() | |
1608 | |
1609 def AddTask(self, task): | |
1610 self.task_queue.put(task) | |
1611 | |
1612 | |
1613 class WorkerThread(threading.Thread): | |
1614 """Thread where all the work will be performed. | |
1615 | |
1616 This makes the function calls for Apply and takes care of all error handling, | |
1617 return value propagation, and shared_vars. | |
1618 | |
1619 Note that this thread is NOT started upon instantiation because the function- | |
1620 calling logic is also used in the single-threaded case. | |
1621 """ | |
1622 | |
1623 def __init__(self, task_queue, logger, bucket_storage_uri_class=None, | |
1624 gsutil_api_map=None, debug=0): | |
1625 """Initializes the worker thread. | |
1626 | |
1627 Args: | |
1628 task_queue: The thread-safe queue from which this thread should obtain | |
1629 its work. | |
1630 logger: Logger to use for this thread. | |
1631 bucket_storage_uri_class: Class to instantiate for cloud StorageUris. | |
1632 Settable for testing/mocking. | |
1633 gsutil_api_map: Map of providers and API selector tuples to api classes | |
1634 which can be used to communicate with those providers. | |
1635 Used for the instantiating CloudApiDelegator class. | |
1636 debug: debug level for the CloudApiDelegator class. | |
1637 """ | |
1638 super(WorkerThread, self).__init__() | |
1639 self.task_queue = task_queue | |
1640 self.daemon = True | |
1641 self.cached_classes = {} | |
1642 self.shared_vars_updater = _SharedVariablesUpdater() | |
1643 | |
1644 self.thread_gsutil_api = None | |
1645 if bucket_storage_uri_class and gsutil_api_map: | |
1646 self.thread_gsutil_api = CloudApiDelegator( | |
1647 bucket_storage_uri_class, gsutil_api_map, logger, debug=debug) | |
1648 | |
1649 def PerformTask(self, task, cls): | |
1650 """Makes the function call for a task. | |
1651 | |
1652 Args: | |
1653 task: The Task to perform. | |
1654 cls: The instance of a class which gives context to the functions called | |
1655 by the Task's function. E.g., see SetAclFuncWrapper. | |
1656 """ | |
1657 caller_id = task.caller_id | |
1658 try: | |
1659 results = task.func(cls, task.args, thread_state=self.thread_gsutil_api) | |
1660 if task.should_return_results: | |
1661 global_return_values_map.Update(caller_id, [results], default_value=[]) | |
1662 except Exception, e: # pylint: disable=broad-except | |
1663 _IncrementFailureCount() | |
1664 if task.fail_on_error: | |
1665 raise # Only happens for single thread and process case. | |
1666 else: | |
1667 try: | |
1668 task.exception_handler(cls, e) | |
1669 except Exception, _: # pylint: disable=broad-except | |
1670 # Don't allow callers to raise exceptions here and kill the worker | |
1671 # threads. | |
1672 cls.logger.debug( | |
1673 'Caught exception while handling exception for %s:\n%s', | |
1674 task, traceback.format_exc()) | |
1675 finally: | |
1676 self.shared_vars_updater.Update(caller_id, cls) | |
1677 | |
1678 # Even if we encounter an exception, we still need to claim that that | |
1679 # the function finished executing. Otherwise, we won't know when to | |
1680 # stop waiting and return results. | |
1681 num_done = caller_id_finished_count.Update(caller_id, 1) | |
1682 | |
1683 if cls.multiprocessing_is_available: | |
1684 _NotifyIfDone(caller_id, num_done) | |
1685 | |
1686 def run(self): | |
1687 while True: | |
1688 task = self.task_queue.get() | |
1689 caller_id = task.caller_id | |
1690 | |
1691 # Get the instance of the command with the appropriate context. | |
1692 cls = self.cached_classes.get(caller_id, None) | |
1693 if not cls: | |
1694 cls = copy.copy(class_map[caller_id]) | |
1695 cls.logger = CreateGsutilLogger(cls.command_name) | |
1696 self.cached_classes[caller_id] = cls | |
1697 | |
1698 self.PerformTask(task, cls) | |
1699 | |
1700 | |
1701 class _SharedVariablesUpdater(object): | |
1702 """Used to update shared variable for a class in the global map. | |
1703 | |
1704 Note that each thread will have its own instance of the calling class for | |
1705 context, and it will also have its own instance of a | |
1706 _SharedVariablesUpdater. This is used in the following way: | |
1707 | |
1708 1. Before any tasks are performed, each thread will get a copy of the | |
1709 calling class, and the globally-consistent value of this shared variable | |
1710 will be initialized to whatever it was before the call to Apply began. | |
1711 | |
1712 2. After each time a thread performs a task, it will look at the current | |
1713 values of the shared variables in its instance of the calling class. | |
1714 | |
1715 2.A. For each such variable, it computes the delta of this variable | |
1716 between the last known value for this class (which is stored in | |
1717 a dict local to this class) and the current value of the variable | |
1718 in the class. | |
1719 | |
1720 2.B. Using this delta, we update the last known value locally as well | |
1721 as the globally-consistent value shared across all classes (the | |
1722 globally consistent value is simply increased by the computed | |
1723 delta). | |
1724 """ | |
1725 | |
1726 def __init__(self): | |
1727 self.last_shared_var_values = {} | |
1728 | |
1729 def Update(self, caller_id, cls): | |
1730 """Update any shared variables with their deltas.""" | |
1731 shared_vars = shared_vars_list_map.get(caller_id, None) | |
1732 if shared_vars: | |
1733 for name in shared_vars: | |
1734 key = (caller_id, name) | |
1735 last_value = self.last_shared_var_values.get(key, 0) | |
1736 # Compute the change made since the last time we updated here. This is | |
1737 # calculated by simply subtracting the last known value from the current | |
1738 # value in the class instance. | |
1739 delta = getattr(cls, name) - last_value | |
1740 self.last_shared_var_values[key] = delta + last_value | |
1741 | |
1742 # Update the globally-consistent value by simply increasing it by the | |
1743 # computed delta. | |
1744 shared_vars_map.Update(key, delta) | |
1745 | |
1746 | |
1747 def _NotifyIfDone(caller_id, num_done): | |
1748 """Notify any threads waiting for results that something has finished. | |
1749 | |
1750 Each waiting thread will then need to check the call_completed_map to see if | |
1751 its work is done. | |
1752 | |
1753 Note that num_done could be calculated here, but it is passed in as an | |
1754 optimization so that we have one less call to a globally-locked data | |
1755 structure. | |
1756 | |
1757 Args: | |
1758 caller_id: The caller_id of the function whose progress we're checking. | |
1759 num_done: The number of tasks currently completed for that caller_id. | |
1760 """ | |
1761 num_to_do = total_tasks[caller_id] | |
1762 if num_to_do == num_done and num_to_do >= 0: | |
1763 # Notify the Apply call that's sleeping that it's ready to return. | |
1764 with need_pool_or_done_cond: | |
1765 call_completed_map[caller_id] = True | |
1766 need_pool_or_done_cond.notify_all() | |
1767 | |
1768 | |
1769 def ShutDownGsutil(): | |
1770 """Shut down all processes in consumer pools in preparation for exiting.""" | |
1771 for q in queues: | |
1772 try: | |
1773 q.cancel_join_thread() | |
1774 except: # pylint: disable=bare-except | |
1775 pass | |
1776 for consumer_pool in consumer_pools: | |
1777 consumer_pool.ShutDown() | |
1778 | |
1779 | |
1780 # pylint: disable=global-variable-undefined | |
1781 def _IncrementFailureCount(): | |
1782 global failure_count | |
1783 if isinstance(failure_count, int): | |
1784 failure_count += 1 | |
1785 else: # Otherwise it's a multiprocessing.Value() of type 'i'. | |
1786 failure_count.value += 1 | |
1787 | |
1788 | |
1789 # pylint: disable=global-variable-undefined | |
1790 def GetFailureCount(): | |
1791 """Returns the number of failures processed during calls to Apply().""" | |
1792 try: | |
1793 if isinstance(failure_count, int): | |
1794 return failure_count | |
1795 else: # It's a multiprocessing.Value() of type 'i'. | |
1796 return failure_count.value | |
1797 except NameError: # If it wasn't initialized, Apply() wasn't called. | |
1798 return 0 | |
1799 | |
1800 | |
1801 def ResetFailureCount(): | |
1802 """Resets the failure_count variable to 0 - useful if error is expected.""" | |
1803 try: | |
1804 global failure_count | |
1805 if isinstance(failure_count, int): | |
1806 failure_count = 0 | |
1807 else: # It's a multiprocessing.Value() of type 'i'. | |
1808 failure_count = multiprocessing.Value('i', 0) | |
1809 except NameError: # If it wasn't initialized, Apply() wasn't called. | |
1810 pass | |
OLD | NEW |