Index: tools/telemetry/third_party/gsutilz/gslib/util.py |
diff --git a/tools/telemetry/third_party/gsutilz/gslib/util.py b/tools/telemetry/third_party/gsutilz/gslib/util.py |
index ece5112c34252b958b1bb5020541869c26401ff5..df07671bb682a8ef188e4524c2ab822b542d3fd4 100644 |
--- a/tools/telemetry/third_party/gsutilz/gslib/util.py |
+++ b/tools/telemetry/third_party/gsutilz/gslib/util.py |
@@ -16,6 +16,7 @@ |
from __future__ import absolute_import |
+import collections |
import errno |
import logging |
import math |
@@ -51,6 +52,24 @@ from gslib.translation_helper import S3_ACL_MARKER_GUID |
from gslib.translation_helper import S3_DELETE_MARKER_GUID |
from gslib.translation_helper import S3_MARKER_GUIDS |
+# Detect platform types. |
+PLATFORM = str(sys.platform).lower() |
+IS_WINDOWS = 'win32' in PLATFORM |
+IS_CYGWIN = 'cygwin' in PLATFORM |
+IS_LINUX = 'linux' in PLATFORM |
+IS_OSX = 'darwin' in PLATFORM |
+ |
+# pylint: disable=g-import-not-at-top |
+if IS_WINDOWS: |
+ from ctypes import c_int |
+ from ctypes import c_uint64 |
+ from ctypes import c_char_p |
+ from ctypes import c_wchar_p |
+ from ctypes import windll |
+ from ctypes import POINTER |
+ from ctypes import WINFUNCTYPE |
+ from ctypes import WinError |
+ |
# pylint: disable=g-import-not-at-top |
try: |
# This module doesn't necessarily exist on Windows. |
@@ -114,16 +133,6 @@ certs_file_lock = threading.Lock() |
configured_certs_files = [] |
-def InitializeMultiprocessingVariables(): |
- """Perform necessary initialization for multiprocessing. |
- |
- See gslib.command.InitializeMultiprocessingVariables for an explanation |
- of why this is necessary. |
- """ |
- global manager # pylint: disable=global-variable-undefined |
- manager = multiprocessing.Manager() |
- |
- |
def _GenerateSuffixRegex(): |
"""Creates a suffix regex for human-readable byte counts.""" |
human_bytes_re = r'(?P<num>\d*\.\d+|\d+)\s*(?P<suffix>%s)?' |
@@ -142,13 +151,6 @@ SUFFIX_TO_SI, MATCH_HUMAN_BYTES = _GenerateSuffixRegex() |
SECONDS_PER_DAY = 3600 * 24 |
-# Detect platform types. |
-PLATFORM = str(sys.platform).lower() |
-IS_WINDOWS = 'win32' in PLATFORM |
-IS_CYGWIN = 'cygwin' in PLATFORM |
-IS_LINUX = 'linux' in PLATFORM |
-IS_OSX = 'darwin' in PLATFORM |
- |
# On Unix-like systems, we will set the maximum number of open files to avoid |
# hitting the limit imposed by the OS. This number was obtained experimentally. |
MIN_ACCEPTABLE_OPEN_FILES_LIMIT = 1000 |
@@ -173,8 +175,48 @@ class ListingStyle(object): |
def UsingCrcmodExtension(crcmod): |
- return (getattr(crcmod, 'crcmod', None) and |
- getattr(crcmod.crcmod, '_usingExtension', None)) |
+ return (boto.config.get('GSUtil', 'test_assume_fast_crcmod', None) or |
+ (getattr(crcmod, 'crcmod', None) and |
+ getattr(crcmod.crcmod, '_usingExtension', None))) |
+ |
+ |
+def CheckFreeSpace(path): |
+ """Return path/drive free space (in bytes).""" |
+ if IS_WINDOWS: |
+ try: |
+ # pylint: disable=invalid-name |
+ get_disk_free_space_ex = WINFUNCTYPE(c_int, c_wchar_p, |
+ POINTER(c_uint64), |
+ POINTER(c_uint64), |
+ POINTER(c_uint64)) |
+ get_disk_free_space_ex = get_disk_free_space_ex( |
+ ('GetDiskFreeSpaceExW', windll.kernel32), ( |
+ (1, 'lpszPathName'), |
+ (2, 'lpFreeUserSpace'), |
+ (2, 'lpTotalSpace'), |
+ (2, 'lpFreeSpace'),)) |
+ except AttributeError: |
+ get_disk_free_space_ex = WINFUNCTYPE(c_int, c_char_p, |
+ POINTER(c_uint64), |
+ POINTER(c_uint64), |
+ POINTER(c_uint64)) |
+ get_disk_free_space_ex = get_disk_free_space_ex( |
+ ('GetDiskFreeSpaceExA', windll.kernel32), ( |
+ (1, 'lpszPathName'), |
+ (2, 'lpFreeUserSpace'), |
+ (2, 'lpTotalSpace'), |
+ (2, 'lpFreeSpace'),)) |
+ |
+ def GetDiskFreeSpaceExErrCheck(result, unused_func, args): |
+ if not result: |
+ raise WinError() |
+ return args[1].value |
+ get_disk_free_space_ex.errcheck = GetDiskFreeSpaceExErrCheck |
+ |
+ return get_disk_free_space_ex(os.getenv('SystemDrive')) |
+ else: |
+ (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path) |
+ return f_frsize * f_bavail |
def CreateDirIfNeeded(dir_path, mode=0777): |
@@ -192,6 +234,25 @@ def CreateDirIfNeeded(dir_path, mode=0777): |
raise |
+def DivideAndCeil(dividend, divisor): |
+ """Returns ceil(dividend / divisor). |
+ |
+ Takes care to avoid the pitfalls of floating point arithmetic that could |
+ otherwise yield the wrong result for large numbers. |
+ |
+ Args: |
+ dividend: Dividend for the operation. |
+ divisor: Divisor for the operation. |
+ |
+ Returns: |
+ Quotient. |
+ """ |
+ quotient = dividend // divisor |
+ if (dividend % divisor) != 0: |
+ quotient += 1 |
+ return quotient |
+ |
+ |
def GetGsutilStateDir(): |
"""Returns the location of the directory for gsutil state files. |
@@ -439,12 +500,15 @@ def GetNewHttp(http_class=httplib2.Http, **kwargs): |
return http |
+# Retry for 10 minutes with exponential backoff, which corresponds to |
+# the maximum Downtime Period specified in the GCS SLA |
+# (https://cloud.google.com/storage/sla) |
def GetNumRetries(): |
- return config.getint('Boto', 'num_retries', 6) |
+ return config.getint('Boto', 'num_retries', 23) |
def GetMaxRetryDelay(): |
- return config.getint('Boto', 'max_retry_delay', 60) |
+ return config.getint('Boto', 'max_retry_delay', 32) |
# Resumable downloads and uploads make one HTTP call per chunk (and must be |
@@ -890,8 +954,12 @@ def HaveProviderUrls(args_to_check): |
return True |
return False |
+# This must be defined at the module level for pickling across processes. |
+MultiprocessingIsAvailableResult = collections.namedtuple( |
+ 'MultiprocessingIsAvailableResult', ['is_available', 'stack_trace']) |
-def MultiprocessingIsAvailable(logger=None): |
+ |
+def CheckMultiprocessingAvailableAndInit(logger=None): |
"""Checks if multiprocessing is available. |
There are some environments in which there is no way to use multiprocessing |
@@ -900,6 +968,10 @@ def MultiprocessingIsAvailable(logger=None): |
needed to make sure the environment can support the pieces of the |
multiprocessing module that we need. |
+ If multiprocessing is available, this performs necessary initialization for |
+ multiprocessing. See gslib.command.InitializeMultiprocessingVariables for |
+ an explanation of why this is necessary. |
+ |
Args: |
logger: logging.logger to use for debug output. |
@@ -917,16 +989,27 @@ def MultiprocessingIsAvailable(logger=None): |
if logger: |
logger.debug(cached_multiprocessing_check_stack_trace) |
logger.warn(cached_multiprocessing_is_available_message) |
- return (cached_multiprocessing_is_available, |
- cached_multiprocessing_check_stack_trace) |
+ return MultiprocessingIsAvailableResult( |
+ is_available=cached_multiprocessing_is_available, |
+ stack_trace=cached_multiprocessing_check_stack_trace) |
+ |
+ if IS_WINDOWS: |
+ message = """ |
+Multiple processes are not supported on Windows. Operations requesting |
+parallelism will be executed with multiple threads in a single process only. |
+""" |
+ if logger: |
+ logger.warn(message) |
+ return MultiprocessingIsAvailableResult(is_available=False, |
+ stack_trace=None) |
stack_trace = None |
multiprocessing_is_available = True |
message = """ |
-You have requested multiple threads or processes for an operation, but the |
+You have requested multiple processes for an operation, but the |
required functionality of Python\'s multiprocessing module is not available. |
-Your operations will be performed sequentially, and any requests for |
-parallelism will be ignored. |
+Operations requesting parallelism will be executed with multiple threads in a |
+single process only. |
""" |
try: |
# Fails if /dev/shm (or some equivalent thereof) is not available for use |
@@ -934,8 +1017,7 @@ parallelism will be ignored. |
try: |
multiprocessing.Value('i', 0) |
except: |
- if not IS_WINDOWS: |
- message += """ |
+ message += """ |
Please ensure that you have write access to both /dev/shm and /run/shm. |
""" |
raise # We'll handle this in one place below. |
@@ -944,7 +1026,9 @@ Please ensure that you have write access to both /dev/shm and /run/shm. |
# out as a sanity check. This definitely works on some versions of Windows, |
# but it's certainly possible that there is some unknown configuration for |
# which it won't. |
- multiprocessing.Manager() |
+ global manager # pylint: disable=global-variable-undefined |
+ |
+ manager = multiprocessing.Manager() |
# Check that the max number of open files is reasonable. Always check this |
# after we're sure that the basic multiprocessing functionality is |
@@ -968,7 +1052,7 @@ Please ensure that you have write access to both /dev/shm and /run/shm. |
except AttributeError: |
pass |
- if limit < MIN_ACCEPTABLE_OPEN_FILES_LIMIT and not IS_WINDOWS: |
+ if limit < MIN_ACCEPTABLE_OPEN_FILES_LIMIT: |
message += (""" |
Your max number of open files, %s, is too low to allow safe multiprocessing. |
On Linux you can fix this by adding something like "ulimit -n 10000" to your |
@@ -995,7 +1079,9 @@ Alternatively, edit /etc/launchd.conf with something like: |
cached_multiprocessing_is_available = multiprocessing_is_available |
cached_multiprocessing_check_stack_trace = stack_trace |
cached_multiprocessing_is_available_message = message |
- return (multiprocessing_is_available, stack_trace) |
+ return MultiprocessingIsAvailableResult( |
+ is_available=cached_multiprocessing_is_available, |
+ stack_trace=cached_multiprocessing_check_stack_trace) |
def CreateLock(): |
@@ -1007,7 +1093,7 @@ def CreateLock(): |
Returns: |
Multiprocessing or threading lock. |
""" |
- if MultiprocessingIsAvailable()[0]: |
+ if CheckMultiprocessingAvailableAndInit().is_available: |
return manager.Lock() |
else: |
return threading.Lock() |