Index: tools/telemetry/third_party/gsutilz/gslib/command.py |
diff --git a/tools/telemetry/third_party/gsutilz/gslib/command.py b/tools/telemetry/third_party/gsutilz/gslib/command.py |
index 3823b91505696bf08f8c3edd03d4af061a450575..1a1da9617582c6b9da76b5888c419f16db741ff0 100644 |
--- a/tools/telemetry/third_party/gsutilz/gslib/command.py |
+++ b/tools/telemetry/third_party/gsutilz/gslib/command.py |
@@ -50,20 +50,19 @@ from gslib.exception import CommandException |
from gslib.help_provider import HelpProvider |
from gslib.name_expansion import NameExpansionIterator |
from gslib.name_expansion import NameExpansionResult |
-from gslib.parallelism_framework_util import AtomicIncrementDict |
-from gslib.parallelism_framework_util import BasicIncrementDict |
-from gslib.parallelism_framework_util import ThreadAndProcessSafeDict |
+from gslib.parallelism_framework_util import AtomicDict |
from gslib.plurality_checkable_iterator import PluralityCheckableIterator |
from gslib.sig_handling import RegisterSignalHandler |
from gslib.storage_url import StorageUrlFromString |
from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages |
from gslib.translation_helper import AclTranslation |
+from gslib.translation_helper import PRIVATE_DEFAULT_OBJ_ACL |
+from gslib.util import CheckMultiprocessingAvailableAndInit |
from gslib.util import GetConfigFilePath |
from gslib.util import GsutilStreamHandler |
from gslib.util import HaveFileUrls |
from gslib.util import HaveProviderUrls |
from gslib.util import IS_WINDOWS |
-from gslib.util import MultiprocessingIsAvailable |
from gslib.util import NO_MAX |
from gslib.util import UrlsAreForSingleProvider |
from gslib.util import UTF8 |
@@ -221,14 +220,14 @@ def InitializeMultiprocessingVariables(): |
caller_id_counter = multiprocessing.Value('i', 0) |
# Map from caller_id to total number of tasks to be completed for that ID. |
- total_tasks = ThreadAndProcessSafeDict(manager) |
+ total_tasks = AtomicDict(manager=manager) |
# Map from caller_id to a boolean which is True iff all its tasks are |
# finished. |
- call_completed_map = ThreadAndProcessSafeDict(manager) |
+ call_completed_map = AtomicDict(manager=manager) |
# Used to keep track of the set of return values for each caller ID. |
- global_return_values_map = AtomicIncrementDict(manager) |
+ global_return_values_map = AtomicDict(manager=manager) |
# Condition used to notify any waiting threads that a task has finished or |
# that a call to Apply needs a new set of consumer processes. |
@@ -239,7 +238,7 @@ def InitializeMultiprocessingVariables(): |
worker_checking_level_lock = manager.Lock() |
# Map from caller_id to the current number of completed tasks for that ID. |
- caller_id_finished_count = AtomicIncrementDict(manager) |
+ caller_id_finished_count = AtomicDict(manager=manager) |
# Used as a way for the main thread to distinguish between being woken up |
# by another call finishing and being woken up by a call that needs a new set |
@@ -249,8 +248,8 @@ def InitializeMultiprocessingVariables(): |
current_max_recursive_level = multiprocessing.Value('i', 0) |
# Map from (caller_id, name) to the value of that shared variable. |
- shared_vars_map = AtomicIncrementDict(manager) |
- shared_vars_list_map = ThreadAndProcessSafeDict(manager) |
+ shared_vars_map = AtomicDict(manager=manager) |
+ shared_vars_list_map = AtomicDict(manager=manager) |
# Map from caller_id to calling class. |
class_map = manager.dict() |
@@ -259,6 +258,33 @@ def InitializeMultiprocessingVariables(): |
failure_count = multiprocessing.Value('i', 0) |
+def InitializeThreadingVariables(): |
+ """Initializes module-level variables used when running multi-threaded. |
+ |
+ When multiprocessing is not available (or on Windows where only 1 process |
+ is used), thread-safe analogs to the multiprocessing global variables |
+ must be initialized. This function is the thread-safe analog to |
+ InitializeMultiprocessingVariables. |
+ """ |
+ # pylint: disable=global-variable-undefined |
+ global global_return_values_map, shared_vars_map, failure_count |
+ global caller_id_finished_count, shared_vars_list_map, total_tasks |
+ global need_pool_or_done_cond, call_completed_map, class_map |
+ global task_queues, caller_id_lock, caller_id_counter |
+ caller_id_counter = 0 |
+ caller_id_finished_count = AtomicDict() |
+ caller_id_lock = threading.Lock() |
+ call_completed_map = AtomicDict() |
+ class_map = AtomicDict() |
+ failure_count = 0 |
+ global_return_values_map = AtomicDict() |
+ need_pool_or_done_cond = threading.Condition() |
+ shared_vars_list_map = AtomicDict() |
+ shared_vars_map = AtomicDict() |
+ task_queues = [] |
+ total_tasks = AtomicDict() |
+ |
+ |
# Each subclass of Command must define a property named 'command_spec' that is |
# an instance of the following class. |
CommandSpec = namedtuple('CommandSpec', [ |
@@ -359,9 +385,9 @@ class Command(HelpProvider): |
'command_name': self.command_name}))) |
return args |
- def __init__(self, command_runner, args, headers, debug, parallel_operations, |
- bucket_storage_uri_class, gsutil_api_class_map_factory, |
- test_method=None, logging_filters=None, |
+ def __init__(self, command_runner, args, headers, debug, trace_token, |
+ parallel_operations, bucket_storage_uri_class, |
+ gsutil_api_class_map_factory, logging_filters=None, |
command_alias_used=None): |
"""Instantiates a Command. |
@@ -370,15 +396,13 @@ class Command(HelpProvider): |
args: Command-line args (arg0 = actual arg, not command name ala bash). |
headers: Dictionary containing optional HTTP headers to pass to boto. |
debug: Debug level to pass in to boto connection (range 0..3). |
+ trace_token: Trace token to pass to the API implementation. |
parallel_operations: Should command operations be executed in parallel? |
bucket_storage_uri_class: Class to instantiate for cloud StorageUris. |
Settable for testing/mocking. |
gsutil_api_class_map_factory: Creates map of cloud storage interfaces. |
Settable for testing/mocking. |
- test_method: Optional general purpose method for testing purposes. |
- Application and semantics of this method will vary by |
- command and test type. |
- logging_filters: Optional list of logging.Filters to apply to this |
+ logging_filters: Optional list of logging. Filters to apply to this |
command's logger. |
command_alias_used: The alias that was actually used when running this |
command (as opposed to the "official" command name, |
@@ -395,10 +419,10 @@ class Command(HelpProvider): |
self.unparsed_args = args |
self.headers = headers |
self.debug = debug |
+ self.trace_token = trace_token |
self.parallel_operations = parallel_operations |
self.bucket_storage_uri_class = bucket_storage_uri_class |
self.gsutil_api_class_map_factory = gsutil_api_class_map_factory |
- self.test_method = test_method |
self.exclude_symlinks = False |
self.recursion_requested = False |
self.all_versions = False |
@@ -445,7 +469,7 @@ class Command(HelpProvider): |
self.project_id = None |
self.gsutil_api = CloudApiDelegator( |
bucket_storage_uri_class, self.gsutil_api_map, |
- self.logger, debug=self.debug) |
+ self.logger, debug=self.debug, trace_token=self.trace_token) |
# Cross-platform path to run gsutil binary. |
self.gsutil_cmd = '' |
@@ -464,7 +488,8 @@ class Command(HelpProvider): |
self.recursion_requested = True |
break |
- self.multiprocessing_is_available = MultiprocessingIsAvailable()[0] |
+ self.multiprocessing_is_available = ( |
+ CheckMultiprocessingAvailableAndInit().is_available) |
def RaiseWrongNumberOfArgumentsException(self): |
"""Raises exception for wrong number of arguments supplied to command.""" |
@@ -689,6 +714,10 @@ class Command(HelpProvider): |
else: |
def_obj_acl = AclTranslation.JsonToMessage( |
self.acl_arg, apitools_messages.ObjectAccessControl) |
+ if not def_obj_acl: |
+ # Use a sentinel value to indicate a private (no entries) default |
+ # object ACL. |
+ def_obj_acl.append(PRIVATE_DEFAULT_OBJ_ACL) |
bucket_metadata = apitools_messages.Bucket( |
defaultObjectAcl=def_obj_acl) |
gsutil_api.PatchBucket(url.bucket_name, bucket_metadata, |
@@ -749,8 +778,6 @@ class Command(HelpProvider): |
self.canned = False |
else: |
# No file exists, so expect a canned ACL string. |
- # Canned ACLs are not supported in JSON and we need to use the XML API |
- # to set them. |
# validate=False because we allow wildcard urls. |
storage_uri = boto.storage_uri( |
url_args[0], debug=self.debug, validate=False, |
@@ -866,8 +893,7 @@ class Command(HelpProvider): |
'command' % (url_str, self.command_name)) |
return list(plurality_iter)[0] |
- def _HandleMultiProcessingSigs(self, unused_signal_num, |
- unused_cur_stack_frame): |
+ def _HandleMultiProcessingSigs(self, signal_num, unused_cur_stack_frame): |
"""Handles signals INT AND TERM during a multi-process/multi-thread request. |
Kills subprocesses. |
@@ -880,7 +906,8 @@ class Command(HelpProvider): |
# https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details |
# about why making it work correctly across OS's is harder and still open. |
ShutDownGsutil() |
- sys.stderr.write('Caught ^C - exiting\n') |
+ if signal_num == signal.SIGINT: |
+ sys.stderr.write('Caught ^C - exiting\n') |
# Simply calling sys.exit(1) doesn't work - see above bug for details. |
KillProcess(os.getpid()) |
@@ -1002,10 +1029,21 @@ class Command(HelpProvider): |
def _SetUpPerCallerState(self): |
"""Set up the state for a caller id, corresponding to one Apply call.""" |
+ # pylint: disable=global-variable-undefined,global-variable-not-assigned |
+ # These variables are initialized in InitializeMultiprocessingVariables or |
+ # InitializeThreadingVariables |
+ global global_return_values_map, shared_vars_map, failure_count |
+ global caller_id_finished_count, shared_vars_list_map, total_tasks |
+ global need_pool_or_done_cond, call_completed_map, class_map |
+ global task_queues, caller_id_lock, caller_id_counter |
# Get a new caller ID. |
with caller_id_lock: |
- caller_id_counter.value += 1 |
- caller_id = caller_id_counter.value |
+ if isinstance(caller_id_counter, int): |
+ caller_id_counter += 1 |
+ caller_id = caller_id_counter |
+ else: |
+ caller_id_counter.value += 1 |
+ caller_id = caller_id_counter.value |
# Create a copy of self with an incremented recursive level. This allows |
# the class to report its level correctly if the function called from it |
@@ -1025,8 +1063,8 @@ class Command(HelpProvider): |
class_map[caller_id] = cls |
total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet. |
call_completed_map[caller_id] = False |
- caller_id_finished_count.Put(caller_id, 0) |
- global_return_values_map.Put(caller_id, []) |
+ caller_id_finished_count[caller_id] = 0 |
+ global_return_values_map[caller_id] = [] |
return caller_id |
def _CreateNewConsumerPool(self, num_processes, num_threads): |
@@ -1103,32 +1141,16 @@ class Command(HelpProvider): |
# fact that it's wasteful to try this multiple times in general, it also |
# will never work when called from a subprocess since we use daemon |
# processes, and daemons can't create other processes. |
- if is_main_thread: |
- if ((not self.multiprocessing_is_available) |
- and thread_count * process_count > 1): |
- # Run the check again and log the appropriate warnings. This was run |
- # before, when the Command object was created, in order to calculate |
- # self.multiprocessing_is_available, but we don't want to print the |
- # warning until we're sure the user actually tried to use multiple |
- # threads or processes. |
- MultiprocessingIsAvailable(logger=self.logger) |
- |
- if self.multiprocessing_is_available: |
- caller_id = self._SetUpPerCallerState() |
- else: |
- self.sequential_caller_id += 1 |
- caller_id = self.sequential_caller_id |
- |
- if is_main_thread: |
- # pylint: disable=global-variable-undefined |
- global global_return_values_map, shared_vars_map, failure_count |
- global caller_id_finished_count, shared_vars_list_map |
- global_return_values_map = BasicIncrementDict() |
- global_return_values_map.Put(caller_id, []) |
- shared_vars_map = BasicIncrementDict() |
- caller_id_finished_count = BasicIncrementDict() |
- shared_vars_list_map = {} |
- failure_count = 0 |
+ if (is_main_thread and not self.multiprocessing_is_available and |
+ process_count > 1): |
+ # Run the check again and log the appropriate warnings. This was run |
+ # before, when the Command object was created, in order to calculate |
+ # self.multiprocessing_is_available, but we don't want to print the |
+ # warning until we're sure the user actually tried to use multiple |
+ # threads or processes. |
+ CheckMultiprocessingAvailableAndInit(logger=self.logger) |
+ |
+ caller_id = self._SetUpPerCallerState() |
# If any shared attributes passed by caller, create a dictionary of |
# shared memory variables for every element in the list of shared |
@@ -1136,12 +1158,14 @@ class Command(HelpProvider): |
if shared_attrs: |
shared_vars_list_map[caller_id] = shared_attrs |
for name in shared_attrs: |
- shared_vars_map.Put((caller_id, name), 0) |
+ shared_vars_map[(caller_id, name)] = 0 |
# Make all of the requested function calls. |
- if self.multiprocessing_is_available and thread_count * process_count > 1: |
+ usable_processes_count = (process_count if self.multiprocessing_is_available |
+ else 1) |
+ if thread_count * usable_processes_count > 1: |
self._ParallelApply(func, args_iterator, exception_handler, caller_id, |
- arg_checker, process_count, thread_count, |
+ arg_checker, usable_processes_count, thread_count, |
should_return_results, fail_on_error) |
else: |
self._SequentialApply(func, args_iterator, exception_handler, caller_id, |
@@ -1153,11 +1177,11 @@ class Command(HelpProvider): |
# and simply apply the delta after what was done during the call to |
# apply. |
final_value = (original_shared_vars_values[name] + |
- shared_vars_map.Get((caller_id, name))) |
+ shared_vars_map.get((caller_id, name))) |
setattr(self, name, final_value) |
if should_return_results: |
- return global_return_values_map.Get(caller_id) |
+ return global_return_values_map.get(caller_id) |
def _MaybeSuggestGsutilDashM(self): |
"""Outputs a sugestion to the user to use gsutil -m.""" |
@@ -1222,6 +1246,10 @@ class Command(HelpProvider): |
# start and it scrolled off-screen. |
self._MaybeSuggestGsutilDashM() |
+ # If the final iterated argument results in an exception, and that |
+ # exception modifies shared_attrs, we need to publish the results. |
+ worker_thread.shared_vars_updater.Update(caller_id, self) |
+ |
# pylint: disable=g-doc-args |
def _ParallelApply(self, func, args_iterator, exception_handler, caller_id, |
arg_checker, process_count, thread_count, |
@@ -1283,7 +1311,10 @@ class Command(HelpProvider): |
# of task queues if it makes a call to Apply, so we always keep around |
# one more queue than we know we need. OTOH, if we don't create a new |
# process, the existing process still needs a task queue to use. |
- task_queues.append(_NewMultiprocessingQueue()) |
+ if process_count > 1: |
+ task_queues.append(_NewMultiprocessingQueue()) |
+ else: |
+ task_queues.append(_NewThreadsafeQueue()) |
if process_count > 1: # Handle process pool creation. |
# Check whether this call will need a new set of workers. |
@@ -1315,8 +1346,10 @@ class Command(HelpProvider): |
# be consumer pools trying to use our processes. |
if process_count > 1: |
task_queue = task_queues[self.recursive_apply_level] |
- else: |
+ elif self.multiprocessing_is_available: |
task_queue = _NewMultiprocessingQueue() |
+ else: |
+ task_queue = _NewThreadsafeQueue() |
# Kick off a producer thread to throw tasks in the global task queue. We |
# do this asynchronously so that the main thread can be free to create new |
@@ -1394,16 +1427,25 @@ class Command(HelpProvider): |
task_queue = task_queue or task_queues[recursive_apply_level] |
+ # Ensure fairness across processes by filling our WorkerPool only with |
+ # as many tasks as it has WorkerThreads. This semaphore is acquired each |
+ # time that a task is retrieved from the queue and released each time |
+ # a task is completed by a WorkerThread. |
+ worker_semaphore = threading.BoundedSemaphore(thread_count) |
+ |
assert thread_count * process_count > 1, ( |
'Invalid state, calling command._ApplyThreads with only one thread ' |
'and process.') |
+ # TODO: Presently, this pool gets recreated with each call to Apply. We |
+ # should be able to do it just once, at process creation time. |
worker_pool = WorkerPool( |
- thread_count, self.logger, |
+ thread_count, self.logger, worker_semaphore, |
bucket_storage_uri_class=self.bucket_storage_uri_class, |
gsutil_api_map=self.gsutil_api_map, debug=self.debug) |
num_enqueued = 0 |
while True: |
+ worker_semaphore.acquire() |
task = task_queue.get() |
if task.args != ZERO_TASKS_TO_DO_ARGUMENT: |
# If we have no tasks to do and we're performing a blocking call, we |
@@ -1411,6 +1453,9 @@ class Command(HelpProvider): |
# the call to task_queue.get() forever. |
worker_pool.AddTask(task) |
num_enqueued += 1 |
+ else: |
+ # No tasks remain; don't block the semaphore on WorkerThread completion. |
+ worker_semaphore.release() |
if is_blocking_call: |
num_to_do = total_tasks[task.caller_id] |
@@ -1588,19 +1633,19 @@ class ProducerThread(threading.Thread): |
# It's possible that the workers finished before we updated total_tasks, |
# so we need to check here as well. |
_NotifyIfDone(self.caller_id, |
- caller_id_finished_count.Get(self.caller_id)) |
+ caller_id_finished_count.get(self.caller_id)) |
class WorkerPool(object): |
"""Pool of worker threads to which tasks can be added.""" |
- def __init__(self, thread_count, logger, bucket_storage_uri_class=None, |
- gsutil_api_map=None, debug=0): |
+ def __init__(self, thread_count, logger, worker_semaphore, |
+ bucket_storage_uri_class=None, gsutil_api_map=None, debug=0): |
self.task_queue = _NewThreadsafeQueue() |
self.threads = [] |
for _ in range(thread_count): |
worker_thread = WorkerThread( |
- self.task_queue, logger, |
+ self.task_queue, logger, worker_semaphore=worker_semaphore, |
bucket_storage_uri_class=bucket_storage_uri_class, |
gsutil_api_map=gsutil_api_map, debug=debug) |
self.threads.append(worker_thread) |
@@ -1620,14 +1665,16 @@ class WorkerThread(threading.Thread): |
calling logic is also used in the single-threaded case. |
""" |
- def __init__(self, task_queue, logger, bucket_storage_uri_class=None, |
- gsutil_api_map=None, debug=0): |
+ def __init__(self, task_queue, logger, worker_semaphore=None, |
+ bucket_storage_uri_class=None, gsutil_api_map=None, debug=0): |
"""Initializes the worker thread. |
Args: |
task_queue: The thread-safe queue from which this thread should obtain |
its work. |
logger: Logger to use for this thread. |
+ worker_semaphore: threading.BoundedSemaphore to be released each time a |
+ task is completed, or None for single-threaded execution. |
bucket_storage_uri_class: Class to instantiate for cloud StorageUris. |
Settable for testing/mocking. |
gsutil_api_map: Map of providers and API selector tuples to api classes |
@@ -1637,6 +1684,7 @@ class WorkerThread(threading.Thread): |
""" |
super(WorkerThread, self).__init__() |
self.task_queue = task_queue |
+ self.worker_semaphore = worker_semaphore |
self.daemon = True |
self.cached_classes = {} |
self.shared_vars_updater = _SharedVariablesUpdater() |
@@ -1658,7 +1706,8 @@ class WorkerThread(threading.Thread): |
try: |
results = task.func(cls, task.args, thread_state=self.thread_gsutil_api) |
if task.should_return_results: |
- global_return_values_map.Update(caller_id, [results], default_value=[]) |
+ global_return_values_map.Increment(caller_id, [results], |
+ default_value=[]) |
except Exception, e: # pylint: disable=broad-except |
_IncrementFailureCount() |
if task.fail_on_error: |
@@ -1673,15 +1722,15 @@ class WorkerThread(threading.Thread): |
'Caught exception while handling exception for %s:\n%s', |
task, traceback.format_exc()) |
finally: |
+ if self.worker_semaphore: |
+ self.worker_semaphore.release() |
self.shared_vars_updater.Update(caller_id, cls) |
# Even if we encounter an exception, we still need to claim that that |
# the function finished executing. Otherwise, we won't know when to |
# stop waiting and return results. |
- num_done = caller_id_finished_count.Update(caller_id, 1) |
- |
- if cls.multiprocessing_is_available: |
- _NotifyIfDone(caller_id, num_done) |
+ num_done = caller_id_finished_count.Increment(caller_id, 1) |
+ _NotifyIfDone(caller_id, num_done) |
def run(self): |
while True: |
@@ -1741,7 +1790,7 @@ class _SharedVariablesUpdater(object): |
# Update the globally-consistent value by simply increasing it by the |
# computed delta. |
- shared_vars_map.Update(key, delta) |
+ shared_vars_map.Increment(key, delta) |
def _NotifyIfDone(caller_id, num_done): |