| Index: third_party/gsutil/gslib/command.py
|
| diff --git a/third_party/gsutil/gslib/command.py b/third_party/gsutil/gslib/command.py
|
| index 3823b91505696bf08f8c3edd03d4af061a450575..1a1da9617582c6b9da76b5888c419f16db741ff0 100644
|
| --- a/third_party/gsutil/gslib/command.py
|
| +++ b/third_party/gsutil/gslib/command.py
|
| @@ -50,20 +50,19 @@ from gslib.exception import CommandException
|
| from gslib.help_provider import HelpProvider
|
| from gslib.name_expansion import NameExpansionIterator
|
| from gslib.name_expansion import NameExpansionResult
|
| -from gslib.parallelism_framework_util import AtomicIncrementDict
|
| -from gslib.parallelism_framework_util import BasicIncrementDict
|
| -from gslib.parallelism_framework_util import ThreadAndProcessSafeDict
|
| +from gslib.parallelism_framework_util import AtomicDict
|
| from gslib.plurality_checkable_iterator import PluralityCheckableIterator
|
| from gslib.sig_handling import RegisterSignalHandler
|
| from gslib.storage_url import StorageUrlFromString
|
| from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
|
| from gslib.translation_helper import AclTranslation
|
| +from gslib.translation_helper import PRIVATE_DEFAULT_OBJ_ACL
|
| +from gslib.util import CheckMultiprocessingAvailableAndInit
|
| from gslib.util import GetConfigFilePath
|
| from gslib.util import GsutilStreamHandler
|
| from gslib.util import HaveFileUrls
|
| from gslib.util import HaveProviderUrls
|
| from gslib.util import IS_WINDOWS
|
| -from gslib.util import MultiprocessingIsAvailable
|
| from gslib.util import NO_MAX
|
| from gslib.util import UrlsAreForSingleProvider
|
| from gslib.util import UTF8
|
| @@ -221,14 +220,14 @@ def InitializeMultiprocessingVariables():
|
| caller_id_counter = multiprocessing.Value('i', 0)
|
|
|
| # Map from caller_id to total number of tasks to be completed for that ID.
|
| - total_tasks = ThreadAndProcessSafeDict(manager)
|
| + total_tasks = AtomicDict(manager=manager)
|
|
|
| # Map from caller_id to a boolean which is True iff all its tasks are
|
| # finished.
|
| - call_completed_map = ThreadAndProcessSafeDict(manager)
|
| + call_completed_map = AtomicDict(manager=manager)
|
|
|
| # Used to keep track of the set of return values for each caller ID.
|
| - global_return_values_map = AtomicIncrementDict(manager)
|
| + global_return_values_map = AtomicDict(manager=manager)
|
|
|
| # Condition used to notify any waiting threads that a task has finished or
|
| # that a call to Apply needs a new set of consumer processes.
|
| @@ -239,7 +238,7 @@ def InitializeMultiprocessingVariables():
|
| worker_checking_level_lock = manager.Lock()
|
|
|
| # Map from caller_id to the current number of completed tasks for that ID.
|
| - caller_id_finished_count = AtomicIncrementDict(manager)
|
| + caller_id_finished_count = AtomicDict(manager=manager)
|
|
|
| # Used as a way for the main thread to distinguish between being woken up
|
| # by another call finishing and being woken up by a call that needs a new set
|
| @@ -249,8 +248,8 @@ def InitializeMultiprocessingVariables():
|
| current_max_recursive_level = multiprocessing.Value('i', 0)
|
|
|
| # Map from (caller_id, name) to the value of that shared variable.
|
| - shared_vars_map = AtomicIncrementDict(manager)
|
| - shared_vars_list_map = ThreadAndProcessSafeDict(manager)
|
| + shared_vars_map = AtomicDict(manager=manager)
|
| + shared_vars_list_map = AtomicDict(manager=manager)
|
|
|
| # Map from caller_id to calling class.
|
| class_map = manager.dict()
|
| @@ -259,6 +258,33 @@ def InitializeMultiprocessingVariables():
|
| failure_count = multiprocessing.Value('i', 0)
|
|
|
|
|
| +def InitializeThreadingVariables():
|
| + """Initializes module-level variables used when running multi-threaded.
|
| +
|
| + When multiprocessing is not available (or on Windows where only 1 process
|
| + is used), thread-safe analogs to the multiprocessing global variables
|
| + must be initialized. This function is the thread-safe analog to
|
| + InitializeMultiprocessingVariables.
|
| + """
|
| + # pylint: disable=global-variable-undefined
|
| + global global_return_values_map, shared_vars_map, failure_count
|
| + global caller_id_finished_count, shared_vars_list_map, total_tasks
|
| + global need_pool_or_done_cond, call_completed_map, class_map
|
| + global task_queues, caller_id_lock, caller_id_counter
|
| + caller_id_counter = 0
|
| + caller_id_finished_count = AtomicDict()
|
| + caller_id_lock = threading.Lock()
|
| + call_completed_map = AtomicDict()
|
| + class_map = AtomicDict()
|
| + failure_count = 0
|
| + global_return_values_map = AtomicDict()
|
| + need_pool_or_done_cond = threading.Condition()
|
| + shared_vars_list_map = AtomicDict()
|
| + shared_vars_map = AtomicDict()
|
| + task_queues = []
|
| + total_tasks = AtomicDict()
|
| +
|
| +
|
| # Each subclass of Command must define a property named 'command_spec' that is
|
| # an instance of the following class.
|
| CommandSpec = namedtuple('CommandSpec', [
|
| @@ -359,9 +385,9 @@ class Command(HelpProvider):
|
| 'command_name': self.command_name})))
|
| return args
|
|
|
| - def __init__(self, command_runner, args, headers, debug, parallel_operations,
|
| - bucket_storage_uri_class, gsutil_api_class_map_factory,
|
| - test_method=None, logging_filters=None,
|
| + def __init__(self, command_runner, args, headers, debug, trace_token,
|
| + parallel_operations, bucket_storage_uri_class,
|
| + gsutil_api_class_map_factory, logging_filters=None,
|
| command_alias_used=None):
|
| """Instantiates a Command.
|
|
|
| @@ -370,15 +396,13 @@ class Command(HelpProvider):
|
| args: Command-line args (arg0 = actual arg, not command name ala bash).
|
| headers: Dictionary containing optional HTTP headers to pass to boto.
|
| debug: Debug level to pass in to boto connection (range 0..3).
|
| + trace_token: Trace token to pass to the API implementation.
|
| parallel_operations: Should command operations be executed in parallel?
|
| bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
|
| Settable for testing/mocking.
|
| gsutil_api_class_map_factory: Creates map of cloud storage interfaces.
|
| Settable for testing/mocking.
|
| - test_method: Optional general purpose method for testing purposes.
|
| - Application and semantics of this method will vary by
|
| - command and test type.
|
| - logging_filters: Optional list of logging.Filters to apply to this
|
| + logging_filters: Optional list of logging. Filters to apply to this
|
| command's logger.
|
| command_alias_used: The alias that was actually used when running this
|
| command (as opposed to the "official" command name,
|
| @@ -395,10 +419,10 @@ class Command(HelpProvider):
|
| self.unparsed_args = args
|
| self.headers = headers
|
| self.debug = debug
|
| + self.trace_token = trace_token
|
| self.parallel_operations = parallel_operations
|
| self.bucket_storage_uri_class = bucket_storage_uri_class
|
| self.gsutil_api_class_map_factory = gsutil_api_class_map_factory
|
| - self.test_method = test_method
|
| self.exclude_symlinks = False
|
| self.recursion_requested = False
|
| self.all_versions = False
|
| @@ -445,7 +469,7 @@ class Command(HelpProvider):
|
| self.project_id = None
|
| self.gsutil_api = CloudApiDelegator(
|
| bucket_storage_uri_class, self.gsutil_api_map,
|
| - self.logger, debug=self.debug)
|
| + self.logger, debug=self.debug, trace_token=self.trace_token)
|
|
|
| # Cross-platform path to run gsutil binary.
|
| self.gsutil_cmd = ''
|
| @@ -464,7 +488,8 @@ class Command(HelpProvider):
|
| self.recursion_requested = True
|
| break
|
|
|
| - self.multiprocessing_is_available = MultiprocessingIsAvailable()[0]
|
| + self.multiprocessing_is_available = (
|
| + CheckMultiprocessingAvailableAndInit().is_available)
|
|
|
| def RaiseWrongNumberOfArgumentsException(self):
|
| """Raises exception for wrong number of arguments supplied to command."""
|
| @@ -689,6 +714,10 @@ class Command(HelpProvider):
|
| else:
|
| def_obj_acl = AclTranslation.JsonToMessage(
|
| self.acl_arg, apitools_messages.ObjectAccessControl)
|
| + if not def_obj_acl:
|
| + # Use a sentinel value to indicate a private (no entries) default
|
| + # object ACL.
|
| + def_obj_acl.append(PRIVATE_DEFAULT_OBJ_ACL)
|
| bucket_metadata = apitools_messages.Bucket(
|
| defaultObjectAcl=def_obj_acl)
|
| gsutil_api.PatchBucket(url.bucket_name, bucket_metadata,
|
| @@ -749,8 +778,6 @@ class Command(HelpProvider):
|
| self.canned = False
|
| else:
|
| # No file exists, so expect a canned ACL string.
|
| - # Canned ACLs are not supported in JSON and we need to use the XML API
|
| - # to set them.
|
| # validate=False because we allow wildcard urls.
|
| storage_uri = boto.storage_uri(
|
| url_args[0], debug=self.debug, validate=False,
|
| @@ -866,8 +893,7 @@ class Command(HelpProvider):
|
| 'command' % (url_str, self.command_name))
|
| return list(plurality_iter)[0]
|
|
|
| - def _HandleMultiProcessingSigs(self, unused_signal_num,
|
| - unused_cur_stack_frame):
|
| + def _HandleMultiProcessingSigs(self, signal_num, unused_cur_stack_frame):
|
| """Handles signals INT AND TERM during a multi-process/multi-thread request.
|
|
|
| Kills subprocesses.
|
| @@ -880,7 +906,8 @@ class Command(HelpProvider):
|
| # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details
|
| # about why making it work correctly across OS's is harder and still open.
|
| ShutDownGsutil()
|
| - sys.stderr.write('Caught ^C - exiting\n')
|
| + if signal_num == signal.SIGINT:
|
| + sys.stderr.write('Caught ^C - exiting\n')
|
| # Simply calling sys.exit(1) doesn't work - see above bug for details.
|
| KillProcess(os.getpid())
|
|
|
| @@ -1002,10 +1029,21 @@ class Command(HelpProvider):
|
|
|
| def _SetUpPerCallerState(self):
|
| """Set up the state for a caller id, corresponding to one Apply call."""
|
| + # pylint: disable=global-variable-undefined,global-variable-not-assigned
|
| + # These variables are initialized in InitializeMultiprocessingVariables or
|
| + # InitializeThreadingVariables
|
| + global global_return_values_map, shared_vars_map, failure_count
|
| + global caller_id_finished_count, shared_vars_list_map, total_tasks
|
| + global need_pool_or_done_cond, call_completed_map, class_map
|
| + global task_queues, caller_id_lock, caller_id_counter
|
| # Get a new caller ID.
|
| with caller_id_lock:
|
| - caller_id_counter.value += 1
|
| - caller_id = caller_id_counter.value
|
| + if isinstance(caller_id_counter, int):
|
| + caller_id_counter += 1
|
| + caller_id = caller_id_counter
|
| + else:
|
| + caller_id_counter.value += 1
|
| + caller_id = caller_id_counter.value
|
|
|
| # Create a copy of self with an incremented recursive level. This allows
|
| # the class to report its level correctly if the function called from it
|
| @@ -1025,8 +1063,8 @@ class Command(HelpProvider):
|
| class_map[caller_id] = cls
|
| total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet.
|
| call_completed_map[caller_id] = False
|
| - caller_id_finished_count.Put(caller_id, 0)
|
| - global_return_values_map.Put(caller_id, [])
|
| + caller_id_finished_count[caller_id] = 0
|
| + global_return_values_map[caller_id] = []
|
| return caller_id
|
|
|
| def _CreateNewConsumerPool(self, num_processes, num_threads):
|
| @@ -1103,32 +1141,16 @@ class Command(HelpProvider):
|
| # fact that it's wasteful to try this multiple times in general, it also
|
| # will never work when called from a subprocess since we use daemon
|
| # processes, and daemons can't create other processes.
|
| - if is_main_thread:
|
| - if ((not self.multiprocessing_is_available)
|
| - and thread_count * process_count > 1):
|
| - # Run the check again and log the appropriate warnings. This was run
|
| - # before, when the Command object was created, in order to calculate
|
| - # self.multiprocessing_is_available, but we don't want to print the
|
| - # warning until we're sure the user actually tried to use multiple
|
| - # threads or processes.
|
| - MultiprocessingIsAvailable(logger=self.logger)
|
| -
|
| - if self.multiprocessing_is_available:
|
| - caller_id = self._SetUpPerCallerState()
|
| - else:
|
| - self.sequential_caller_id += 1
|
| - caller_id = self.sequential_caller_id
|
| -
|
| - if is_main_thread:
|
| - # pylint: disable=global-variable-undefined
|
| - global global_return_values_map, shared_vars_map, failure_count
|
| - global caller_id_finished_count, shared_vars_list_map
|
| - global_return_values_map = BasicIncrementDict()
|
| - global_return_values_map.Put(caller_id, [])
|
| - shared_vars_map = BasicIncrementDict()
|
| - caller_id_finished_count = BasicIncrementDict()
|
| - shared_vars_list_map = {}
|
| - failure_count = 0
|
| + if (is_main_thread and not self.multiprocessing_is_available and
|
| + process_count > 1):
|
| + # Run the check again and log the appropriate warnings. This was run
|
| + # before, when the Command object was created, in order to calculate
|
| + # self.multiprocessing_is_available, but we don't want to print the
|
| + # warning until we're sure the user actually tried to use multiple
|
| + # threads or processes.
|
| + CheckMultiprocessingAvailableAndInit(logger=self.logger)
|
| +
|
| + caller_id = self._SetUpPerCallerState()
|
|
|
| # If any shared attributes passed by caller, create a dictionary of
|
| # shared memory variables for every element in the list of shared
|
| @@ -1136,12 +1158,14 @@ class Command(HelpProvider):
|
| if shared_attrs:
|
| shared_vars_list_map[caller_id] = shared_attrs
|
| for name in shared_attrs:
|
| - shared_vars_map.Put((caller_id, name), 0)
|
| + shared_vars_map[(caller_id, name)] = 0
|
|
|
| # Make all of the requested function calls.
|
| - if self.multiprocessing_is_available and thread_count * process_count > 1:
|
| + usable_processes_count = (process_count if self.multiprocessing_is_available
|
| + else 1)
|
| + if thread_count * usable_processes_count > 1:
|
| self._ParallelApply(func, args_iterator, exception_handler, caller_id,
|
| - arg_checker, process_count, thread_count,
|
| + arg_checker, usable_processes_count, thread_count,
|
| should_return_results, fail_on_error)
|
| else:
|
| self._SequentialApply(func, args_iterator, exception_handler, caller_id,
|
| @@ -1153,11 +1177,11 @@ class Command(HelpProvider):
|
| # and simply apply the delta after what was done during the call to
|
| # apply.
|
| final_value = (original_shared_vars_values[name] +
|
| - shared_vars_map.Get((caller_id, name)))
|
| + shared_vars_map.get((caller_id, name)))
|
| setattr(self, name, final_value)
|
|
|
| if should_return_results:
|
| - return global_return_values_map.Get(caller_id)
|
| + return global_return_values_map.get(caller_id)
|
|
|
| def _MaybeSuggestGsutilDashM(self):
|
| """Outputs a sugestion to the user to use gsutil -m."""
|
| @@ -1222,6 +1246,10 @@ class Command(HelpProvider):
|
| # start and it scrolled off-screen.
|
| self._MaybeSuggestGsutilDashM()
|
|
|
| + # If the final iterated argument results in an exception, and that
|
| + # exception modifies shared_attrs, we need to publish the results.
|
| + worker_thread.shared_vars_updater.Update(caller_id, self)
|
| +
|
| # pylint: disable=g-doc-args
|
| def _ParallelApply(self, func, args_iterator, exception_handler, caller_id,
|
| arg_checker, process_count, thread_count,
|
| @@ -1283,7 +1311,10 @@ class Command(HelpProvider):
|
| # of task queues if it makes a call to Apply, so we always keep around
|
| # one more queue than we know we need. OTOH, if we don't create a new
|
| # process, the existing process still needs a task queue to use.
|
| - task_queues.append(_NewMultiprocessingQueue())
|
| + if process_count > 1:
|
| + task_queues.append(_NewMultiprocessingQueue())
|
| + else:
|
| + task_queues.append(_NewThreadsafeQueue())
|
|
|
| if process_count > 1: # Handle process pool creation.
|
| # Check whether this call will need a new set of workers.
|
| @@ -1315,8 +1346,10 @@ class Command(HelpProvider):
|
| # be consumer pools trying to use our processes.
|
| if process_count > 1:
|
| task_queue = task_queues[self.recursive_apply_level]
|
| - else:
|
| + elif self.multiprocessing_is_available:
|
| task_queue = _NewMultiprocessingQueue()
|
| + else:
|
| + task_queue = _NewThreadsafeQueue()
|
|
|
| # Kick off a producer thread to throw tasks in the global task queue. We
|
| # do this asynchronously so that the main thread can be free to create new
|
| @@ -1394,16 +1427,25 @@ class Command(HelpProvider):
|
|
|
| task_queue = task_queue or task_queues[recursive_apply_level]
|
|
|
| + # Ensure fairness across processes by filling our WorkerPool only with
|
| + # as many tasks as it has WorkerThreads. This semaphore is acquired each
|
| + # time that a task is retrieved from the queue and released each time
|
| + # a task is completed by a WorkerThread.
|
| + worker_semaphore = threading.BoundedSemaphore(thread_count)
|
| +
|
| assert thread_count * process_count > 1, (
|
| 'Invalid state, calling command._ApplyThreads with only one thread '
|
| 'and process.')
|
| + # TODO: Presently, this pool gets recreated with each call to Apply. We
|
| + # should be able to do it just once, at process creation time.
|
| worker_pool = WorkerPool(
|
| - thread_count, self.logger,
|
| + thread_count, self.logger, worker_semaphore,
|
| bucket_storage_uri_class=self.bucket_storage_uri_class,
|
| gsutil_api_map=self.gsutil_api_map, debug=self.debug)
|
|
|
| num_enqueued = 0
|
| while True:
|
| + worker_semaphore.acquire()
|
| task = task_queue.get()
|
| if task.args != ZERO_TASKS_TO_DO_ARGUMENT:
|
| # If we have no tasks to do and we're performing a blocking call, we
|
| @@ -1411,6 +1453,9 @@ class Command(HelpProvider):
|
| # the call to task_queue.get() forever.
|
| worker_pool.AddTask(task)
|
| num_enqueued += 1
|
| + else:
|
| + # No tasks remain; don't block the semaphore on WorkerThread completion.
|
| + worker_semaphore.release()
|
|
|
| if is_blocking_call:
|
| num_to_do = total_tasks[task.caller_id]
|
| @@ -1588,19 +1633,19 @@ class ProducerThread(threading.Thread):
|
| # It's possible that the workers finished before we updated total_tasks,
|
| # so we need to check here as well.
|
| _NotifyIfDone(self.caller_id,
|
| - caller_id_finished_count.Get(self.caller_id))
|
| + caller_id_finished_count.get(self.caller_id))
|
|
|
|
|
| class WorkerPool(object):
|
| """Pool of worker threads to which tasks can be added."""
|
|
|
| - def __init__(self, thread_count, logger, bucket_storage_uri_class=None,
|
| - gsutil_api_map=None, debug=0):
|
| + def __init__(self, thread_count, logger, worker_semaphore,
|
| + bucket_storage_uri_class=None, gsutil_api_map=None, debug=0):
|
| self.task_queue = _NewThreadsafeQueue()
|
| self.threads = []
|
| for _ in range(thread_count):
|
| worker_thread = WorkerThread(
|
| - self.task_queue, logger,
|
| + self.task_queue, logger, worker_semaphore=worker_semaphore,
|
| bucket_storage_uri_class=bucket_storage_uri_class,
|
| gsutil_api_map=gsutil_api_map, debug=debug)
|
| self.threads.append(worker_thread)
|
| @@ -1620,14 +1665,16 @@ class WorkerThread(threading.Thread):
|
| calling logic is also used in the single-threaded case.
|
| """
|
|
|
| - def __init__(self, task_queue, logger, bucket_storage_uri_class=None,
|
| - gsutil_api_map=None, debug=0):
|
| + def __init__(self, task_queue, logger, worker_semaphore=None,
|
| + bucket_storage_uri_class=None, gsutil_api_map=None, debug=0):
|
| """Initializes the worker thread.
|
|
|
| Args:
|
| task_queue: The thread-safe queue from which this thread should obtain
|
| its work.
|
| logger: Logger to use for this thread.
|
| + worker_semaphore: threading.BoundedSemaphore to be released each time a
|
| + task is completed, or None for single-threaded execution.
|
| bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
|
| Settable for testing/mocking.
|
| gsutil_api_map: Map of providers and API selector tuples to api classes
|
| @@ -1637,6 +1684,7 @@ class WorkerThread(threading.Thread):
|
| """
|
| super(WorkerThread, self).__init__()
|
| self.task_queue = task_queue
|
| + self.worker_semaphore = worker_semaphore
|
| self.daemon = True
|
| self.cached_classes = {}
|
| self.shared_vars_updater = _SharedVariablesUpdater()
|
| @@ -1658,7 +1706,8 @@ class WorkerThread(threading.Thread):
|
| try:
|
| results = task.func(cls, task.args, thread_state=self.thread_gsutil_api)
|
| if task.should_return_results:
|
| - global_return_values_map.Update(caller_id, [results], default_value=[])
|
| + global_return_values_map.Increment(caller_id, [results],
|
| + default_value=[])
|
| except Exception, e: # pylint: disable=broad-except
|
| _IncrementFailureCount()
|
| if task.fail_on_error:
|
| @@ -1673,15 +1722,15 @@ class WorkerThread(threading.Thread):
|
| 'Caught exception while handling exception for %s:\n%s',
|
| task, traceback.format_exc())
|
| finally:
|
| + if self.worker_semaphore:
|
| + self.worker_semaphore.release()
|
| self.shared_vars_updater.Update(caller_id, cls)
|
|
|
| # Even if we encounter an exception, we still need to claim that that
|
| # the function finished executing. Otherwise, we won't know when to
|
| # stop waiting and return results.
|
| - num_done = caller_id_finished_count.Update(caller_id, 1)
|
| -
|
| - if cls.multiprocessing_is_available:
|
| - _NotifyIfDone(caller_id, num_done)
|
| + num_done = caller_id_finished_count.Increment(caller_id, 1)
|
| + _NotifyIfDone(caller_id, num_done)
|
|
|
| def run(self):
|
| while True:
|
| @@ -1741,7 +1790,7 @@ class _SharedVariablesUpdater(object):
|
|
|
| # Update the globally-consistent value by simply increasing it by the
|
| # computed delta.
|
| - shared_vars_map.Update(key, delta)
|
| + shared_vars_map.Increment(key, delta)
|
|
|
|
|
| def _NotifyIfDone(caller_id, num_done):
|
|
|