Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(939)

Unified Diff: tools/telemetry/third_party/gsutilz/gslib/command.py

Issue 1376593003: Roll gsutil version to 4.15. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/gsutilz/gslib/command.py
diff --git a/tools/telemetry/third_party/gsutilz/gslib/command.py b/tools/telemetry/third_party/gsutilz/gslib/command.py
index 3823b91505696bf08f8c3edd03d4af061a450575..1a1da9617582c6b9da76b5888c419f16db741ff0 100644
--- a/tools/telemetry/third_party/gsutilz/gslib/command.py
+++ b/tools/telemetry/third_party/gsutilz/gslib/command.py
@@ -50,20 +50,19 @@ from gslib.exception import CommandException
from gslib.help_provider import HelpProvider
from gslib.name_expansion import NameExpansionIterator
from gslib.name_expansion import NameExpansionResult
-from gslib.parallelism_framework_util import AtomicIncrementDict
-from gslib.parallelism_framework_util import BasicIncrementDict
-from gslib.parallelism_framework_util import ThreadAndProcessSafeDict
+from gslib.parallelism_framework_util import AtomicDict
from gslib.plurality_checkable_iterator import PluralityCheckableIterator
from gslib.sig_handling import RegisterSignalHandler
from gslib.storage_url import StorageUrlFromString
from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
from gslib.translation_helper import AclTranslation
+from gslib.translation_helper import PRIVATE_DEFAULT_OBJ_ACL
+from gslib.util import CheckMultiprocessingAvailableAndInit
from gslib.util import GetConfigFilePath
from gslib.util import GsutilStreamHandler
from gslib.util import HaveFileUrls
from gslib.util import HaveProviderUrls
from gslib.util import IS_WINDOWS
-from gslib.util import MultiprocessingIsAvailable
from gslib.util import NO_MAX
from gslib.util import UrlsAreForSingleProvider
from gslib.util import UTF8
@@ -221,14 +220,14 @@ def InitializeMultiprocessingVariables():
caller_id_counter = multiprocessing.Value('i', 0)
# Map from caller_id to total number of tasks to be completed for that ID.
- total_tasks = ThreadAndProcessSafeDict(manager)
+ total_tasks = AtomicDict(manager=manager)
# Map from caller_id to a boolean which is True iff all its tasks are
# finished.
- call_completed_map = ThreadAndProcessSafeDict(manager)
+ call_completed_map = AtomicDict(manager=manager)
# Used to keep track of the set of return values for each caller ID.
- global_return_values_map = AtomicIncrementDict(manager)
+ global_return_values_map = AtomicDict(manager=manager)
# Condition used to notify any waiting threads that a task has finished or
# that a call to Apply needs a new set of consumer processes.
@@ -239,7 +238,7 @@ def InitializeMultiprocessingVariables():
worker_checking_level_lock = manager.Lock()
# Map from caller_id to the current number of completed tasks for that ID.
- caller_id_finished_count = AtomicIncrementDict(manager)
+ caller_id_finished_count = AtomicDict(manager=manager)
# Used as a way for the main thread to distinguish between being woken up
# by another call finishing and being woken up by a call that needs a new set
@@ -249,8 +248,8 @@ def InitializeMultiprocessingVariables():
current_max_recursive_level = multiprocessing.Value('i', 0)
# Map from (caller_id, name) to the value of that shared variable.
- shared_vars_map = AtomicIncrementDict(manager)
- shared_vars_list_map = ThreadAndProcessSafeDict(manager)
+ shared_vars_map = AtomicDict(manager=manager)
+ shared_vars_list_map = AtomicDict(manager=manager)
# Map from caller_id to calling class.
class_map = manager.dict()
@@ -259,6 +258,33 @@ def InitializeMultiprocessingVariables():
failure_count = multiprocessing.Value('i', 0)
+def InitializeThreadingVariables():
+ """Initializes module-level variables used when running multi-threaded.
+
+ When multiprocessing is not available (or on Windows where only 1 process
+ is used), thread-safe analogs to the multiprocessing global variables
+ must be initialized. This function is the thread-safe analog to
+ InitializeMultiprocessingVariables.
+ """
+ # pylint: disable=global-variable-undefined
+ global global_return_values_map, shared_vars_map, failure_count
+ global caller_id_finished_count, shared_vars_list_map, total_tasks
+ global need_pool_or_done_cond, call_completed_map, class_map
+ global task_queues, caller_id_lock, caller_id_counter
+ caller_id_counter = 0
+ caller_id_finished_count = AtomicDict()
+ caller_id_lock = threading.Lock()
+ call_completed_map = AtomicDict()
+ class_map = AtomicDict()
+ failure_count = 0
+ global_return_values_map = AtomicDict()
+ need_pool_or_done_cond = threading.Condition()
+ shared_vars_list_map = AtomicDict()
+ shared_vars_map = AtomicDict()
+ task_queues = []
+ total_tasks = AtomicDict()
+
+
# Each subclass of Command must define a property named 'command_spec' that is
# an instance of the following class.
CommandSpec = namedtuple('CommandSpec', [
@@ -359,9 +385,9 @@ class Command(HelpProvider):
'command_name': self.command_name})))
return args
- def __init__(self, command_runner, args, headers, debug, parallel_operations,
- bucket_storage_uri_class, gsutil_api_class_map_factory,
- test_method=None, logging_filters=None,
+ def __init__(self, command_runner, args, headers, debug, trace_token,
+ parallel_operations, bucket_storage_uri_class,
+ gsutil_api_class_map_factory, logging_filters=None,
command_alias_used=None):
"""Instantiates a Command.
@@ -370,15 +396,13 @@ class Command(HelpProvider):
args: Command-line args (arg0 = actual arg, not command name ala bash).
headers: Dictionary containing optional HTTP headers to pass to boto.
debug: Debug level to pass in to boto connection (range 0..3).
+ trace_token: Trace token to pass to the API implementation.
parallel_operations: Should command operations be executed in parallel?
bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
Settable for testing/mocking.
gsutil_api_class_map_factory: Creates map of cloud storage interfaces.
Settable for testing/mocking.
- test_method: Optional general purpose method for testing purposes.
- Application and semantics of this method will vary by
- command and test type.
- logging_filters: Optional list of logging.Filters to apply to this
+ logging_filters: Optional list of logging. Filters to apply to this
command's logger.
command_alias_used: The alias that was actually used when running this
command (as opposed to the "official" command name,
@@ -395,10 +419,10 @@ class Command(HelpProvider):
self.unparsed_args = args
self.headers = headers
self.debug = debug
+ self.trace_token = trace_token
self.parallel_operations = parallel_operations
self.bucket_storage_uri_class = bucket_storage_uri_class
self.gsutil_api_class_map_factory = gsutil_api_class_map_factory
- self.test_method = test_method
self.exclude_symlinks = False
self.recursion_requested = False
self.all_versions = False
@@ -445,7 +469,7 @@ class Command(HelpProvider):
self.project_id = None
self.gsutil_api = CloudApiDelegator(
bucket_storage_uri_class, self.gsutil_api_map,
- self.logger, debug=self.debug)
+ self.logger, debug=self.debug, trace_token=self.trace_token)
# Cross-platform path to run gsutil binary.
self.gsutil_cmd = ''
@@ -464,7 +488,8 @@ class Command(HelpProvider):
self.recursion_requested = True
break
- self.multiprocessing_is_available = MultiprocessingIsAvailable()[0]
+ self.multiprocessing_is_available = (
+ CheckMultiprocessingAvailableAndInit().is_available)
def RaiseWrongNumberOfArgumentsException(self):
"""Raises exception for wrong number of arguments supplied to command."""
@@ -689,6 +714,10 @@ class Command(HelpProvider):
else:
def_obj_acl = AclTranslation.JsonToMessage(
self.acl_arg, apitools_messages.ObjectAccessControl)
+ if not def_obj_acl:
+ # Use a sentinel value to indicate a private (no entries) default
+ # object ACL.
+ def_obj_acl.append(PRIVATE_DEFAULT_OBJ_ACL)
bucket_metadata = apitools_messages.Bucket(
defaultObjectAcl=def_obj_acl)
gsutil_api.PatchBucket(url.bucket_name, bucket_metadata,
@@ -749,8 +778,6 @@ class Command(HelpProvider):
self.canned = False
else:
# No file exists, so expect a canned ACL string.
- # Canned ACLs are not supported in JSON and we need to use the XML API
- # to set them.
# validate=False because we allow wildcard urls.
storage_uri = boto.storage_uri(
url_args[0], debug=self.debug, validate=False,
@@ -866,8 +893,7 @@ class Command(HelpProvider):
'command' % (url_str, self.command_name))
return list(plurality_iter)[0]
- def _HandleMultiProcessingSigs(self, unused_signal_num,
- unused_cur_stack_frame):
+ def _HandleMultiProcessingSigs(self, signal_num, unused_cur_stack_frame):
"""Handles signals INT AND TERM during a multi-process/multi-thread request.
Kills subprocesses.
@@ -880,7 +906,8 @@ class Command(HelpProvider):
# https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details
# about why making it work correctly across OS's is harder and still open.
ShutDownGsutil()
- sys.stderr.write('Caught ^C - exiting\n')
+ if signal_num == signal.SIGINT:
+ sys.stderr.write('Caught ^C - exiting\n')
# Simply calling sys.exit(1) doesn't work - see above bug for details.
KillProcess(os.getpid())
@@ -1002,10 +1029,21 @@ class Command(HelpProvider):
def _SetUpPerCallerState(self):
"""Set up the state for a caller id, corresponding to one Apply call."""
+ # pylint: disable=global-variable-undefined,global-variable-not-assigned
+ # These variables are initialized in InitializeMultiprocessingVariables or
+ # InitializeThreadingVariables
+ global global_return_values_map, shared_vars_map, failure_count
+ global caller_id_finished_count, shared_vars_list_map, total_tasks
+ global need_pool_or_done_cond, call_completed_map, class_map
+ global task_queues, caller_id_lock, caller_id_counter
# Get a new caller ID.
with caller_id_lock:
- caller_id_counter.value += 1
- caller_id = caller_id_counter.value
+ if isinstance(caller_id_counter, int):
+ caller_id_counter += 1
+ caller_id = caller_id_counter
+ else:
+ caller_id_counter.value += 1
+ caller_id = caller_id_counter.value
# Create a copy of self with an incremented recursive level. This allows
# the class to report its level correctly if the function called from it
@@ -1025,8 +1063,8 @@ class Command(HelpProvider):
class_map[caller_id] = cls
total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet.
call_completed_map[caller_id] = False
- caller_id_finished_count.Put(caller_id, 0)
- global_return_values_map.Put(caller_id, [])
+ caller_id_finished_count[caller_id] = 0
+ global_return_values_map[caller_id] = []
return caller_id
def _CreateNewConsumerPool(self, num_processes, num_threads):
@@ -1103,32 +1141,16 @@ class Command(HelpProvider):
# fact that it's wasteful to try this multiple times in general, it also
# will never work when called from a subprocess since we use daemon
# processes, and daemons can't create other processes.
- if is_main_thread:
- if ((not self.multiprocessing_is_available)
- and thread_count * process_count > 1):
- # Run the check again and log the appropriate warnings. This was run
- # before, when the Command object was created, in order to calculate
- # self.multiprocessing_is_available, but we don't want to print the
- # warning until we're sure the user actually tried to use multiple
- # threads or processes.
- MultiprocessingIsAvailable(logger=self.logger)
-
- if self.multiprocessing_is_available:
- caller_id = self._SetUpPerCallerState()
- else:
- self.sequential_caller_id += 1
- caller_id = self.sequential_caller_id
-
- if is_main_thread:
- # pylint: disable=global-variable-undefined
- global global_return_values_map, shared_vars_map, failure_count
- global caller_id_finished_count, shared_vars_list_map
- global_return_values_map = BasicIncrementDict()
- global_return_values_map.Put(caller_id, [])
- shared_vars_map = BasicIncrementDict()
- caller_id_finished_count = BasicIncrementDict()
- shared_vars_list_map = {}
- failure_count = 0
+ if (is_main_thread and not self.multiprocessing_is_available and
+ process_count > 1):
+ # Run the check again and log the appropriate warnings. This was run
+ # before, when the Command object was created, in order to calculate
+ # self.multiprocessing_is_available, but we don't want to print the
+ # warning until we're sure the user actually tried to use multiple
+ # threads or processes.
+ CheckMultiprocessingAvailableAndInit(logger=self.logger)
+
+ caller_id = self._SetUpPerCallerState()
# If any shared attributes passed by caller, create a dictionary of
# shared memory variables for every element in the list of shared
@@ -1136,12 +1158,14 @@ class Command(HelpProvider):
if shared_attrs:
shared_vars_list_map[caller_id] = shared_attrs
for name in shared_attrs:
- shared_vars_map.Put((caller_id, name), 0)
+ shared_vars_map[(caller_id, name)] = 0
# Make all of the requested function calls.
- if self.multiprocessing_is_available and thread_count * process_count > 1:
+ usable_processes_count = (process_count if self.multiprocessing_is_available
+ else 1)
+ if thread_count * usable_processes_count > 1:
self._ParallelApply(func, args_iterator, exception_handler, caller_id,
- arg_checker, process_count, thread_count,
+ arg_checker, usable_processes_count, thread_count,
should_return_results, fail_on_error)
else:
self._SequentialApply(func, args_iterator, exception_handler, caller_id,
@@ -1153,11 +1177,11 @@ class Command(HelpProvider):
# and simply apply the delta after what was done during the call to
# apply.
final_value = (original_shared_vars_values[name] +
- shared_vars_map.Get((caller_id, name)))
+ shared_vars_map.get((caller_id, name)))
setattr(self, name, final_value)
if should_return_results:
- return global_return_values_map.Get(caller_id)
+ return global_return_values_map.get(caller_id)
def _MaybeSuggestGsutilDashM(self):
"""Outputs a sugestion to the user to use gsutil -m."""
@@ -1222,6 +1246,10 @@ class Command(HelpProvider):
# start and it scrolled off-screen.
self._MaybeSuggestGsutilDashM()
+ # If the final iterated argument results in an exception, and that
+ # exception modifies shared_attrs, we need to publish the results.
+ worker_thread.shared_vars_updater.Update(caller_id, self)
+
# pylint: disable=g-doc-args
def _ParallelApply(self, func, args_iterator, exception_handler, caller_id,
arg_checker, process_count, thread_count,
@@ -1283,7 +1311,10 @@ class Command(HelpProvider):
# of task queues if it makes a call to Apply, so we always keep around
# one more queue than we know we need. OTOH, if we don't create a new
# process, the existing process still needs a task queue to use.
- task_queues.append(_NewMultiprocessingQueue())
+ if process_count > 1:
+ task_queues.append(_NewMultiprocessingQueue())
+ else:
+ task_queues.append(_NewThreadsafeQueue())
if process_count > 1: # Handle process pool creation.
# Check whether this call will need a new set of workers.
@@ -1315,8 +1346,10 @@ class Command(HelpProvider):
# be consumer pools trying to use our processes.
if process_count > 1:
task_queue = task_queues[self.recursive_apply_level]
- else:
+ elif self.multiprocessing_is_available:
task_queue = _NewMultiprocessingQueue()
+ else:
+ task_queue = _NewThreadsafeQueue()
# Kick off a producer thread to throw tasks in the global task queue. We
# do this asynchronously so that the main thread can be free to create new
@@ -1394,16 +1427,25 @@ class Command(HelpProvider):
task_queue = task_queue or task_queues[recursive_apply_level]
+ # Ensure fairness across processes by filling our WorkerPool only with
+ # as many tasks as it has WorkerThreads. This semaphore is acquired each
+ # time that a task is retrieved from the queue and released each time
+ # a task is completed by a WorkerThread.
+ worker_semaphore = threading.BoundedSemaphore(thread_count)
+
assert thread_count * process_count > 1, (
'Invalid state, calling command._ApplyThreads with only one thread '
'and process.')
+ # TODO: Presently, this pool gets recreated with each call to Apply. We
+ # should be able to do it just once, at process creation time.
worker_pool = WorkerPool(
- thread_count, self.logger,
+ thread_count, self.logger, worker_semaphore,
bucket_storage_uri_class=self.bucket_storage_uri_class,
gsutil_api_map=self.gsutil_api_map, debug=self.debug)
num_enqueued = 0
while True:
+ worker_semaphore.acquire()
task = task_queue.get()
if task.args != ZERO_TASKS_TO_DO_ARGUMENT:
# If we have no tasks to do and we're performing a blocking call, we
@@ -1411,6 +1453,9 @@ class Command(HelpProvider):
# the call to task_queue.get() forever.
worker_pool.AddTask(task)
num_enqueued += 1
+ else:
+ # No tasks remain; don't block the semaphore on WorkerThread completion.
+ worker_semaphore.release()
if is_blocking_call:
num_to_do = total_tasks[task.caller_id]
@@ -1588,19 +1633,19 @@ class ProducerThread(threading.Thread):
# It's possible that the workers finished before we updated total_tasks,
# so we need to check here as well.
_NotifyIfDone(self.caller_id,
- caller_id_finished_count.Get(self.caller_id))
+ caller_id_finished_count.get(self.caller_id))
class WorkerPool(object):
"""Pool of worker threads to which tasks can be added."""
- def __init__(self, thread_count, logger, bucket_storage_uri_class=None,
- gsutil_api_map=None, debug=0):
+ def __init__(self, thread_count, logger, worker_semaphore,
+ bucket_storage_uri_class=None, gsutil_api_map=None, debug=0):
self.task_queue = _NewThreadsafeQueue()
self.threads = []
for _ in range(thread_count):
worker_thread = WorkerThread(
- self.task_queue, logger,
+ self.task_queue, logger, worker_semaphore=worker_semaphore,
bucket_storage_uri_class=bucket_storage_uri_class,
gsutil_api_map=gsutil_api_map, debug=debug)
self.threads.append(worker_thread)
@@ -1620,14 +1665,16 @@ class WorkerThread(threading.Thread):
calling logic is also used in the single-threaded case.
"""
- def __init__(self, task_queue, logger, bucket_storage_uri_class=None,
- gsutil_api_map=None, debug=0):
+ def __init__(self, task_queue, logger, worker_semaphore=None,
+ bucket_storage_uri_class=None, gsutil_api_map=None, debug=0):
"""Initializes the worker thread.
Args:
task_queue: The thread-safe queue from which this thread should obtain
its work.
logger: Logger to use for this thread.
+ worker_semaphore: threading.BoundedSemaphore to be released each time a
+ task is completed, or None for single-threaded execution.
bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
Settable for testing/mocking.
gsutil_api_map: Map of providers and API selector tuples to api classes
@@ -1637,6 +1684,7 @@ class WorkerThread(threading.Thread):
"""
super(WorkerThread, self).__init__()
self.task_queue = task_queue
+ self.worker_semaphore = worker_semaphore
self.daemon = True
self.cached_classes = {}
self.shared_vars_updater = _SharedVariablesUpdater()
@@ -1658,7 +1706,8 @@ class WorkerThread(threading.Thread):
try:
results = task.func(cls, task.args, thread_state=self.thread_gsutil_api)
if task.should_return_results:
- global_return_values_map.Update(caller_id, [results], default_value=[])
+ global_return_values_map.Increment(caller_id, [results],
+ default_value=[])
except Exception, e: # pylint: disable=broad-except
_IncrementFailureCount()
if task.fail_on_error:
@@ -1673,15 +1722,15 @@ class WorkerThread(threading.Thread):
'Caught exception while handling exception for %s:\n%s',
task, traceback.format_exc())
finally:
+ if self.worker_semaphore:
+ self.worker_semaphore.release()
self.shared_vars_updater.Update(caller_id, cls)
# Even if we encounter an exception, we still need to claim that that
# the function finished executing. Otherwise, we won't know when to
# stop waiting and return results.
- num_done = caller_id_finished_count.Update(caller_id, 1)
-
- if cls.multiprocessing_is_available:
- _NotifyIfDone(caller_id, num_done)
+ num_done = caller_id_finished_count.Increment(caller_id, 1)
+ _NotifyIfDone(caller_id, num_done)
def run(self):
while True:
@@ -1741,7 +1790,7 @@ class _SharedVariablesUpdater(object):
# Update the globally-consistent value by simply increasing it by the
# computed delta.
- shared_vars_map.Update(key, delta)
+ shared_vars_map.Increment(key, delta)
def _NotifyIfDone(caller_id, num_done):

Powered by Google App Engine
This is Rietveld 408576698