| Index: third_party/gsutil/gslib/util.py
|
| diff --git a/third_party/gsutil/gslib/util.py b/third_party/gsutil/gslib/util.py
|
| index ece5112c34252b958b1bb5020541869c26401ff5..df07671bb682a8ef188e4524c2ab822b542d3fd4 100644
|
| --- a/third_party/gsutil/gslib/util.py
|
| +++ b/third_party/gsutil/gslib/util.py
|
| @@ -16,6 +16,7 @@
|
|
|
| from __future__ import absolute_import
|
|
|
| +import collections
|
| import errno
|
| import logging
|
| import math
|
| @@ -51,6 +52,24 @@ from gslib.translation_helper import S3_ACL_MARKER_GUID
|
| from gslib.translation_helper import S3_DELETE_MARKER_GUID
|
| from gslib.translation_helper import S3_MARKER_GUIDS
|
|
|
| +# Detect platform types.
|
| +PLATFORM = str(sys.platform).lower()
|
| +IS_WINDOWS = 'win32' in PLATFORM
|
| +IS_CYGWIN = 'cygwin' in PLATFORM
|
| +IS_LINUX = 'linux' in PLATFORM
|
| +IS_OSX = 'darwin' in PLATFORM
|
| +
|
| +# pylint: disable=g-import-not-at-top
|
| +if IS_WINDOWS:
|
| + from ctypes import c_int
|
| + from ctypes import c_uint64
|
| + from ctypes import c_char_p
|
| + from ctypes import c_wchar_p
|
| + from ctypes import windll
|
| + from ctypes import POINTER
|
| + from ctypes import WINFUNCTYPE
|
| + from ctypes import WinError
|
| +
|
| # pylint: disable=g-import-not-at-top
|
| try:
|
| # This module doesn't necessarily exist on Windows.
|
| @@ -114,16 +133,6 @@ certs_file_lock = threading.Lock()
|
| configured_certs_files = []
|
|
|
|
|
| -def InitializeMultiprocessingVariables():
|
| - """Perform necessary initialization for multiprocessing.
|
| -
|
| - See gslib.command.InitializeMultiprocessingVariables for an explanation
|
| - of why this is necessary.
|
| - """
|
| - global manager # pylint: disable=global-variable-undefined
|
| - manager = multiprocessing.Manager()
|
| -
|
| -
|
| def _GenerateSuffixRegex():
|
| """Creates a suffix regex for human-readable byte counts."""
|
| human_bytes_re = r'(?P<num>\d*\.\d+|\d+)\s*(?P<suffix>%s)?'
|
| @@ -142,13 +151,6 @@ SUFFIX_TO_SI, MATCH_HUMAN_BYTES = _GenerateSuffixRegex()
|
|
|
| SECONDS_PER_DAY = 3600 * 24
|
|
|
| -# Detect platform types.
|
| -PLATFORM = str(sys.platform).lower()
|
| -IS_WINDOWS = 'win32' in PLATFORM
|
| -IS_CYGWIN = 'cygwin' in PLATFORM
|
| -IS_LINUX = 'linux' in PLATFORM
|
| -IS_OSX = 'darwin' in PLATFORM
|
| -
|
| # On Unix-like systems, we will set the maximum number of open files to avoid
|
| # hitting the limit imposed by the OS. This number was obtained experimentally.
|
| MIN_ACCEPTABLE_OPEN_FILES_LIMIT = 1000
|
| @@ -173,8 +175,48 @@ class ListingStyle(object):
|
|
|
|
|
| def UsingCrcmodExtension(crcmod):
|
| - return (getattr(crcmod, 'crcmod', None) and
|
| - getattr(crcmod.crcmod, '_usingExtension', None))
|
| + return (boto.config.get('GSUtil', 'test_assume_fast_crcmod', None) or
|
| + (getattr(crcmod, 'crcmod', None) and
|
| + getattr(crcmod.crcmod, '_usingExtension', None)))
|
| +
|
| +
|
| +def CheckFreeSpace(path):
|
| + """Return path/drive free space (in bytes)."""
|
| + if IS_WINDOWS:
|
| + try:
|
| + # pylint: disable=invalid-name
|
| + get_disk_free_space_ex = WINFUNCTYPE(c_int, c_wchar_p,
|
| + POINTER(c_uint64),
|
| + POINTER(c_uint64),
|
| + POINTER(c_uint64))
|
| + get_disk_free_space_ex = get_disk_free_space_ex(
|
| + ('GetDiskFreeSpaceExW', windll.kernel32), (
|
| + (1, 'lpszPathName'),
|
| + (2, 'lpFreeUserSpace'),
|
| + (2, 'lpTotalSpace'),
|
| + (2, 'lpFreeSpace'),))
|
| + except AttributeError:
|
| + get_disk_free_space_ex = WINFUNCTYPE(c_int, c_char_p,
|
| + POINTER(c_uint64),
|
| + POINTER(c_uint64),
|
| + POINTER(c_uint64))
|
| + get_disk_free_space_ex = get_disk_free_space_ex(
|
| + ('GetDiskFreeSpaceExA', windll.kernel32), (
|
| + (1, 'lpszPathName'),
|
| + (2, 'lpFreeUserSpace'),
|
| + (2, 'lpTotalSpace'),
|
| + (2, 'lpFreeSpace'),))
|
| +
|
| + def GetDiskFreeSpaceExErrCheck(result, unused_func, args):
|
| + if not result:
|
| + raise WinError()
|
| + return args[1].value
|
| + get_disk_free_space_ex.errcheck = GetDiskFreeSpaceExErrCheck
|
| +
|
| + return get_disk_free_space_ex(os.getenv('SystemDrive'))
|
| + else:
|
| + (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path)
|
| + return f_frsize * f_bavail
|
|
|
|
|
| def CreateDirIfNeeded(dir_path, mode=0777):
|
| @@ -192,6 +234,25 @@ def CreateDirIfNeeded(dir_path, mode=0777):
|
| raise
|
|
|
|
|
| +def DivideAndCeil(dividend, divisor):
|
| + """Returns ceil(dividend / divisor).
|
| +
|
| + Takes care to avoid the pitfalls of floating point arithmetic that could
|
| + otherwise yield the wrong result for large numbers.
|
| +
|
| + Args:
|
| + dividend: Dividend for the operation.
|
| + divisor: Divisor for the operation.
|
| +
|
| + Returns:
|
| + Quotient.
|
| + """
|
| + quotient = dividend // divisor
|
| + if (dividend % divisor) != 0:
|
| + quotient += 1
|
| + return quotient
|
| +
|
| +
|
| def GetGsutilStateDir():
|
| """Returns the location of the directory for gsutil state files.
|
|
|
| @@ -439,12 +500,15 @@ def GetNewHttp(http_class=httplib2.Http, **kwargs):
|
| return http
|
|
|
|
|
| +# Retry for 10 minutes with exponential backoff, which corresponds to
|
| +# the maximum Downtime Period specified in the GCS SLA
|
| +# (https://cloud.google.com/storage/sla)
|
| def GetNumRetries():
|
| - return config.getint('Boto', 'num_retries', 6)
|
| + return config.getint('Boto', 'num_retries', 23)
|
|
|
|
|
| def GetMaxRetryDelay():
|
| - return config.getint('Boto', 'max_retry_delay', 60)
|
| + return config.getint('Boto', 'max_retry_delay', 32)
|
|
|
|
|
| # Resumable downloads and uploads make one HTTP call per chunk (and must be
|
| @@ -890,8 +954,12 @@ def HaveProviderUrls(args_to_check):
|
| return True
|
| return False
|
|
|
| +# This must be defined at the module level for pickling across processes.
|
| +MultiprocessingIsAvailableResult = collections.namedtuple(
|
| + 'MultiprocessingIsAvailableResult', ['is_available', 'stack_trace'])
|
|
|
| -def MultiprocessingIsAvailable(logger=None):
|
| +
|
| +def CheckMultiprocessingAvailableAndInit(logger=None):
|
| """Checks if multiprocessing is available.
|
|
|
| There are some environments in which there is no way to use multiprocessing
|
| @@ -900,6 +968,10 @@ def MultiprocessingIsAvailable(logger=None):
|
| needed to make sure the environment can support the pieces of the
|
| multiprocessing module that we need.
|
|
|
| + If multiprocessing is available, this performs necessary initialization for
|
| + multiprocessing. See gslib.command.InitializeMultiprocessingVariables for
|
| + an explanation of why this is necessary.
|
| +
|
| Args:
|
| logger: logging.logger to use for debug output.
|
|
|
| @@ -917,16 +989,27 @@ def MultiprocessingIsAvailable(logger=None):
|
| if logger:
|
| logger.debug(cached_multiprocessing_check_stack_trace)
|
| logger.warn(cached_multiprocessing_is_available_message)
|
| - return (cached_multiprocessing_is_available,
|
| - cached_multiprocessing_check_stack_trace)
|
| + return MultiprocessingIsAvailableResult(
|
| + is_available=cached_multiprocessing_is_available,
|
| + stack_trace=cached_multiprocessing_check_stack_trace)
|
| +
|
| + if IS_WINDOWS:
|
| + message = """
|
| +Multiple processes are not supported on Windows. Operations requesting
|
| +parallelism will be executed with multiple threads in a single process only.
|
| +"""
|
| + if logger:
|
| + logger.warn(message)
|
| + return MultiprocessingIsAvailableResult(is_available=False,
|
| + stack_trace=None)
|
|
|
| stack_trace = None
|
| multiprocessing_is_available = True
|
| message = """
|
| -You have requested multiple threads or processes for an operation, but the
|
| +You have requested multiple processes for an operation, but the
|
| required functionality of Python\'s multiprocessing module is not available.
|
| -Your operations will be performed sequentially, and any requests for
|
| -parallelism will be ignored.
|
| +Operations requesting parallelism will be executed with multiple threads in a
|
| +single process only.
|
| """
|
| try:
|
| # Fails if /dev/shm (or some equivalent thereof) is not available for use
|
| @@ -934,8 +1017,7 @@ parallelism will be ignored.
|
| try:
|
| multiprocessing.Value('i', 0)
|
| except:
|
| - if not IS_WINDOWS:
|
| - message += """
|
| + message += """
|
| Please ensure that you have write access to both /dev/shm and /run/shm.
|
| """
|
| raise # We'll handle this in one place below.
|
| @@ -944,7 +1026,9 @@ Please ensure that you have write access to both /dev/shm and /run/shm.
|
| # out as a sanity check. This definitely works on some versions of Windows,
|
| # but it's certainly possible that there is some unknown configuration for
|
| # which it won't.
|
| - multiprocessing.Manager()
|
| + global manager # pylint: disable=global-variable-undefined
|
| +
|
| + manager = multiprocessing.Manager()
|
|
|
| # Check that the max number of open files is reasonable. Always check this
|
| # after we're sure that the basic multiprocessing functionality is
|
| @@ -968,7 +1052,7 @@ Please ensure that you have write access to both /dev/shm and /run/shm.
|
| except AttributeError:
|
| pass
|
|
|
| - if limit < MIN_ACCEPTABLE_OPEN_FILES_LIMIT and not IS_WINDOWS:
|
| + if limit < MIN_ACCEPTABLE_OPEN_FILES_LIMIT:
|
| message += ("""
|
| Your max number of open files, %s, is too low to allow safe multiprocessing.
|
| On Linux you can fix this by adding something like "ulimit -n 10000" to your
|
| @@ -995,7 +1079,9 @@ Alternatively, edit /etc/launchd.conf with something like:
|
| cached_multiprocessing_is_available = multiprocessing_is_available
|
| cached_multiprocessing_check_stack_trace = stack_trace
|
| cached_multiprocessing_is_available_message = message
|
| - return (multiprocessing_is_available, stack_trace)
|
| + return MultiprocessingIsAvailableResult(
|
| + is_available=cached_multiprocessing_is_available,
|
| + stack_trace=cached_multiprocessing_check_stack_trace)
|
|
|
|
|
| def CreateLock():
|
| @@ -1007,7 +1093,7 @@ def CreateLock():
|
| Returns:
|
| Multiprocessing or threading lock.
|
| """
|
| - if MultiprocessingIsAvailable()[0]:
|
| + if CheckMultiprocessingAvailableAndInit().is_available:
|
| return manager.Lock()
|
| else:
|
| return threading.Lock()
|
|
|