| Index: third_party/gsutil/gslib/commands/perfdiag.py
|
| diff --git a/third_party/gsutil/gslib/commands/perfdiag.py b/third_party/gsutil/gslib/commands/perfdiag.py
|
| index d88eae78c1242e823b9be169e8f2f9f103a15a23..f95545bdd85537ef30ed7db418ed6a1760492bc4 100644
|
| --- a/third_party/gsutil/gslib/commands/perfdiag.py
|
| +++ b/third_party/gsutil/gslib/commands/perfdiag.py
|
| @@ -18,6 +18,7 @@ from __future__ import absolute_import
|
|
|
| import calendar
|
| from collections import defaultdict
|
| +from collections import namedtuple
|
| import contextlib
|
| import cStringIO
|
| import datetime
|
| @@ -41,17 +42,21 @@ import boto.gs.connection
|
| import gslib
|
| from gslib.cloud_api import NotFoundException
|
| from gslib.cloud_api import ServiceException
|
| -from gslib.cloud_api_helper import GetDownloadSerializationDict
|
| +from gslib.cloud_api_helper import GetDownloadSerializationData
|
| from gslib.command import Command
|
| from gslib.command import DummyArgChecker
|
| from gslib.command_argument import CommandArgument
|
| from gslib.commands import config
|
| from gslib.cs_api_map import ApiSelector
|
| from gslib.exception import CommandException
|
| +from gslib.file_part import FilePart
|
| from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
|
| from gslib.storage_url import StorageUrlFromString
|
| from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
|
| +from gslib.util import CheckFreeSpace
|
| +from gslib.util import DivideAndCeil
|
| from gslib.util import GetCloudApiInstance
|
| +from gslib.util import GetFileSize
|
| from gslib.util import GetMaxRetryDelay
|
| from gslib.util import HumanReadableToBytes
|
| from gslib.util import IS_LINUX
|
| @@ -60,11 +65,11 @@ from gslib.util import MakeHumanReadable
|
| from gslib.util import Percentile
|
| from gslib.util import ResumableThreshold
|
|
|
| -
|
| _SYNOPSIS = """
|
| gsutil perfdiag [-i in.json]
|
| - gsutil perfdiag [-o out.json] [-n iterations] [-c processes]
|
| - [-k threads] [-s size] [-t tests] url...
|
| + gsutil perfdiag [-o out.json] [-n objects] [-c processes]
|
| + [-k threads] [-p parallelism type] [-y slices] [-s size] [-d directory]
|
| + [-t tests] url...
|
| """
|
|
|
| _DETAILED_HELP_TEXT = ("""
|
| @@ -99,9 +104,8 @@ _DETAILED_HELP_TEXT = ("""
|
|
|
|
|
| <B>OPTIONS</B>
|
| - -n Sets the number of iterations performed when downloading and
|
| - uploading files during latency and throughput tests. Defaults to
|
| - 5.
|
| + -n Sets the number of objects to use when downloading and uploading
|
| + files during tests. Defaults to 5.
|
|
|
| -c Sets the number of processes to use while running throughput
|
| experiments. The default value is 1.
|
| @@ -110,20 +114,55 @@ _DETAILED_HELP_TEXT = ("""
|
| throughput experiments. Each process will receive an equal number
|
| of threads. The default value is 1.
|
|
|
| - -s Sets the size (in bytes) of the test file used to perform read
|
| - and write throughput tests. The default is 1 MiB. This can also
|
| - be specified using byte suffixes such as 500K or 1M. Note: these
|
| - values are interpreted as multiples of 1024 (K=1024, M=1024*1024,
|
| - etc.)
|
| + Note: All specified threads and processes will be created, but may
|
| + not by saturated with work if too few objects (specified with -n)
|
| + and too few components (specified with -y) are specified.
|
| +
|
| + -p Sets the type of parallelism to be used (only applicable when
|
| + threads or processes are specified and threads * processes > 1).
|
| + The default is to use fan. Must be one of the following:
|
| +
|
| + fan
|
| + Use one thread per object. This is akin to using gsutil -m cp,
|
| + with sliced object download / parallel composite upload
|
| + disabled.
|
| +
|
| + slice
|
| + Use Y (specified with -y) threads for each object, transferring
|
| + one object at a time. This is akin to using parallel object
|
| + download / parallel composite upload, without -m. Sliced
|
| + uploads not supported for s3.
|
| +
|
| + both
|
| + Use Y (specified with -y) threads for each object, transferring
|
| + multiple objects at a time. This is akin to simultaneously
|
| + using sliced object download / parallel composite upload and
|
| + gsutil -m cp. Sliced uploads not supported for s3.
|
| +
|
| + -y Sets the number of slices to divide each file/object into while
|
| + transferring data. Only applicable with the slice (or both)
|
| + parallelism type. The default is 4 slices.
|
| +
|
| + -s Sets the size (in bytes) for each of the N (set with -n) objects
|
| + used in the read and write throughput tests. The default is 1 MiB.
|
| + This can also be specified using byte suffixes such as 500K or 1M.
|
| + Note: these values are interpreted as multiples of 1024 (K=1024,
|
| + M=1024*1024, etc.)
|
| + Note: If rthru_file or wthru_file are performed, N (set with -n)
|
| + times as much disk space as specified will be required for the
|
| + operation.
|
| +
|
| + -d Sets the directory to store temporary local files in. If not
|
| + specified, a default temporary directory will be used.
|
|
|
| -t Sets the list of diagnostic tests to perform. The default is to
|
| - run all diagnostic tests. Must be a comma-separated list
|
| - containing one or more of the following:
|
| + run the lat, rthru, and wthru diagnostic tests. Must be a
|
| + comma-separated list containing one or more of the following:
|
|
|
| lat
|
| - Runs N iterations (set with -n) of writing the file,
|
| - retrieving its metadata, reading the file, and deleting
|
| - the file. Records the latency of each operation.
|
| + For N (set with -n) objects, write the object, retrieve its
|
| + metadata, read the object, and finally delete the object.
|
| + Record the latency of each operation.
|
|
|
| list
|
| Write N (set with -n) objects to the bucket, record how long
|
| @@ -136,15 +175,22 @@ _DETAILED_HELP_TEXT = ("""
|
| Runs N (set with -n) read operations, with at most C
|
| (set with -c) reads outstanding at any given time.
|
|
|
| + rthru_file
|
| + The same as rthru, but simultaneously writes data to the disk,
|
| + to gauge the performance impact of the local disk on downloads.
|
| +
|
| wthru
|
| Runs N (set with -n) write operations, with at most C
|
| (set with -c) writes outstanding at any given time.
|
|
|
| + wthru_file
|
| + The same as wthru, but simultaneously reads data from the disk,
|
| + to gauge the performance impact of the local disk on uploads.
|
| +
|
| -m Adds metadata to the result JSON file. Multiple -m values can be
|
| specified. Example:
|
|
|
| - gsutil perfdiag -m "key1:value1" -m "key2:value2" \
|
| - gs://bucketname/
|
| + gsutil perfdiag -m "key1:val1" -m "key2:val2" gs://bucketname
|
|
|
| Each metadata key will be added to the top-level "metadata"
|
| dictionary in the output JSON file.
|
| @@ -178,6 +224,41 @@ _DETAILED_HELP_TEXT = ("""
|
| this information will be sent to Google unless you choose to send it.
|
| """)
|
|
|
| +FileDataTuple = namedtuple(
|
| + 'FileDataTuple',
|
| + 'size md5 data')
|
| +
|
| +# Describes one object in a fanned download. If need_to_slice is specified as
|
| +# True, the object should be downloaded with the slice strategy. Other field
|
| +# names are the same as documented in PerfDiagCommand.Download.
|
| +FanDownloadTuple = namedtuple(
|
| + 'FanDownloadTuple',
|
| + 'need_to_slice object_name file_name serialization_data')
|
| +
|
| +# Describes one slice in a sliced download.
|
| +# Field names are the same as documented in PerfDiagCommand.Download.
|
| +SliceDownloadTuple = namedtuple(
|
| + 'SliceDownloadTuple',
|
| + 'object_name file_name serialization_data start_byte end_byte')
|
| +
|
| +# Describes one file in a fanned upload. If need_to_slice is specified as
|
| +# True, the file should be uploaded with the slice strategy. Other field
|
| +# names are the same as documented in PerfDiagCommand.Upload.
|
| +FanUploadTuple = namedtuple(
|
| + 'FanUploadTuple',
|
| + 'need_to_slice file_name object_name use_file')
|
| +
|
| +# Describes one slice in a sliced upload.
|
| +# Field names are the same as documented in PerfDiagCommand.Upload.
|
| +SliceUploadTuple = namedtuple(
|
| + 'SliceUploadTuple',
|
| + 'file_name object_name use_file file_start file_size')
|
| +
|
| +# Dict storing file_path:FileDataTuple for each temporary file used by
|
| +# perfdiag. This data should be kept outside of the PerfDiagCommand class
|
| +# since calls to Apply will make copies of all member data.
|
| +temp_file_dict = {}
|
| +
|
|
|
| class Error(Exception):
|
| """Base exception class for this module."""
|
| @@ -189,16 +270,73 @@ class InvalidArgument(Error):
|
| pass
|
|
|
|
|
| -def _DownloadWrapper(cls, arg, thread_state=None):
|
| - cls.Download(arg, thread_state=thread_state)
|
| +def _DownloadObject(cls, args, thread_state=None):
|
| + """Function argument to apply for performing fanned parallel downloads.
|
|
|
| + Args:
|
| + cls: The calling PerfDiagCommand class instance.
|
| + args: A FanDownloadTuple object describing this download.
|
| + thread_state: gsutil Cloud API instance to use for the operation.
|
| + """
|
| + cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
|
| + if args.need_to_slice:
|
| + cls.PerformSlicedDownload(args.object_name, args.file_name,
|
| + args.serialization_data)
|
| + else:
|
| + cls.Download(args.object_name, args.file_name, args.serialization_data)
|
|
|
| -def _UploadWrapper(cls, arg, thread_state=None):
|
| - cls.Upload(arg, thread_state=thread_state)
|
|
|
| +def _DownloadSlice(cls, args, thread_state=None):
|
| + """Function argument to apply for performing sliced downloads.
|
|
|
| -def _DeleteWrapper(cls, arg, thread_state=None):
|
| - cls.Delete(arg, thread_state=thread_state)
|
| + Args:
|
| + cls: The calling PerfDiagCommand class instance.
|
| + args: A SliceDownloadTuple object describing this download.
|
| + thread_state: gsutil Cloud API instance to use for the operation.
|
| + """
|
| + cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
|
| + cls.Download(args.object_name, args.file_name, args.serialization_data,
|
| + args.start_byte, args.end_byte)
|
| +
|
| +
|
| +def _UploadObject(cls, args, thread_state=None):
|
| + """Function argument to apply for performing fanned parallel uploads.
|
| +
|
| + Args:
|
| + cls: The calling PerfDiagCommand class instance.
|
| + args: A FanUploadTuple object describing this upload.
|
| + thread_state: gsutil Cloud API instance to use for the operation.
|
| + """
|
| + cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
|
| + if args.need_to_slice:
|
| + cls.PerformSlicedUpload(args.file_name, args.object_name, args.use_file)
|
| + else:
|
| + cls.Upload(args.file_name, args.object_name, args.use_file)
|
| +
|
| +
|
| +def _UploadSlice(cls, args, thread_state=None):
|
| + """Function argument to apply for performing sliced parallel uploads.
|
| +
|
| + Args:
|
| + cls: The calling PerfDiagCommand class instance.
|
| + args: A SliceUploadTuple object describing this upload.
|
| + thread_state: gsutil Cloud API instance to use for the operation.
|
| + """
|
| + cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
|
| + cls.Upload(args.file_name, args.object_name, args.use_file,
|
| + args.file_start, args.file_size)
|
| +
|
| +
|
| +def _DeleteWrapper(cls, object_name, thread_state=None):
|
| + """Function argument to apply for performing parallel object deletions.
|
| +
|
| + Args:
|
| + cls: The calling PerfDiagCommand class instance.
|
| + object_name: The object name to delete from the test bucket.
|
| + thread_state: gsutil Cloud API instance to use for the operation.
|
| + """
|
| + cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
|
| + cls.Delete(object_name)
|
|
|
|
|
| def _PerfdiagExceptionHandler(cls, e):
|
| @@ -216,6 +354,9 @@ class DummyFile(object):
|
| def write(self, *args, **kwargs): # pylint: disable=invalid-name
|
| pass
|
|
|
| + def close(self): # pylint: disable=invalid-name
|
| + pass
|
| +
|
|
|
| # Many functions in perfdiag re-define a temporary function based on a
|
| # variable from a loop, resulting in a false positive from the linter.
|
| @@ -230,7 +371,7 @@ class PerfDiagCommand(Command):
|
| usage_synopsis=_SYNOPSIS,
|
| min_args=0,
|
| max_args=1,
|
| - supported_sub_args='n:c:k:s:t:m:i:o:',
|
| + supported_sub_args='n:c:k:p:y:s:d:t:m:i:o:',
|
| file_url_ok=False,
|
| provider_url_ok=False,
|
| urls_start_arg=0,
|
| @@ -253,7 +394,7 @@ class PerfDiagCommand(Command):
|
| # Byte sizes to use for latency testing files.
|
| # TODO: Consider letting the user specify these sizes with a configuration
|
| # parameter.
|
| - test_file_sizes = (
|
| + test_lat_file_sizes = (
|
| 0, # 0 bytes
|
| 1024, # 1 KiB
|
| 102400, # 100 KiB
|
| @@ -262,15 +403,26 @@ class PerfDiagCommand(Command):
|
|
|
| # Test names.
|
| RTHRU = 'rthru'
|
| + RTHRU_FILE = 'rthru_file'
|
| WTHRU = 'wthru'
|
| + WTHRU_FILE = 'wthru_file'
|
| LAT = 'lat'
|
| LIST = 'list'
|
|
|
| + # Parallelism strategies.
|
| + FAN = 'fan'
|
| + SLICE = 'slice'
|
| + BOTH = 'both'
|
| +
|
| # List of all diagnostic tests.
|
| - ALL_DIAG_TESTS = (RTHRU, WTHRU, LAT, LIST)
|
| + ALL_DIAG_TESTS = (RTHRU, RTHRU_FILE, WTHRU, WTHRU_FILE, LAT, LIST)
|
| +
|
| # List of diagnostic tests to run by default.
|
| DEFAULT_DIAG_TESTS = (RTHRU, WTHRU, LAT)
|
|
|
| + # List of parallelism strategies.
|
| + PARALLEL_STRATEGIES = (FAN, SLICE, BOTH)
|
| +
|
| # Google Cloud Storage XML API endpoint host.
|
| XML_API_HOST = boto.config.get(
|
| 'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost)
|
| @@ -324,21 +476,58 @@ class PerfDiagCommand(Command):
|
| "subprocess '%s'." % (p.returncode, ' '.join(cmd)))
|
| return stdoutdata if return_output else p.returncode
|
|
|
| + def _WarnIfLargeData(self):
|
| + """Outputs a warning message if a large amount of data is being used."""
|
| + if self.num_objects * self.thru_filesize > HumanReadableToBytes('2GiB'):
|
| + self.logger.info('This is a large operation, and could take a while.')
|
| +
|
| + def _MakeTempFile(self, file_size=0, mem_metadata=False,
|
| + mem_data=False, prefix='gsutil_test_file'):
|
| + """Creates a temporary file of the given size and returns its path.
|
| +
|
| + Args:
|
| + file_size: The size of the temporary file to create.
|
| + mem_metadata: If true, store md5 and file size in memory at
|
| + temp_file_dict[fpath].md5, tempfile_data[fpath].file_size.
|
| + mem_data: If true, store the file data in memory at
|
| + temp_file_dict[fpath].data
|
| + prefix: The prefix to use for the temporary file. Defaults to
|
| + gsutil_test_file.
|
| +
|
| + Returns:
|
| + The file path of the created temporary file.
|
| + """
|
| + fd, fpath = tempfile.mkstemp(suffix='.bin', prefix=prefix,
|
| + dir=self.directory, text=False)
|
| + with os.fdopen(fd, 'wb') as fp:
|
| + random_bytes = os.urandom(min(file_size,
|
| + self.MAX_UNIQUE_RANDOM_BYTES))
|
| + total_bytes_written = 0
|
| + while total_bytes_written < file_size:
|
| + num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES,
|
| + file_size - total_bytes_written)
|
| + fp.write(random_bytes[:num_bytes])
|
| + total_bytes_written += num_bytes
|
| +
|
| + if mem_metadata or mem_data:
|
| + with open(fpath, 'rb') as fp:
|
| + file_size = GetFileSize(fp) if mem_metadata else None
|
| + md5 = CalculateB64EncodedMd5FromContents(fp) if mem_metadata else None
|
| + data = fp.read() if mem_data else None
|
| + temp_file_dict[fpath] = FileDataTuple(file_size, md5, data)
|
| +
|
| + self.temporary_files.add(fpath)
|
| + return fpath
|
| +
|
| def _SetUp(self):
|
| """Performs setup operations needed before diagnostics can be run."""
|
|
|
| # Stores test result data.
|
| self.results = {}
|
| - # List of test files in a temporary location on disk for latency ops.
|
| - self.latency_files = []
|
| - # List of test objects to clean up in the test bucket.
|
| - self.test_object_names = set()
|
| - # Maps each test file path to its size in bytes.
|
| - self.file_sizes = {}
|
| - # Maps each test file to its contents as a string.
|
| - self.file_contents = {}
|
| - # Maps each test file to its MD5 hash.
|
| - self.file_md5s = {}
|
| + # Set of file paths for local temporary files.
|
| + self.temporary_files = set()
|
| + # Set of names for test objects that exist in the test bucket.
|
| + self.temporary_objects = set()
|
| # Total number of HTTP requests made.
|
| self.total_requests = 0
|
| # Total number of HTTP 5xx errors.
|
| @@ -347,63 +536,86 @@ class PerfDiagCommand(Command):
|
| self.error_responses_by_code = defaultdict(int)
|
| # Total number of socket errors.
|
| self.connection_breaks = 0
|
| -
|
| - def _MakeFile(file_size):
|
| - """Creates a temporary file of the given size and returns its path."""
|
| - fd, fpath = tempfile.mkstemp(suffix='.bin', prefix='gsutil_test_file',
|
| - text=False)
|
| - self.file_sizes[fpath] = file_size
|
| - random_bytes = os.urandom(min(file_size, self.MAX_UNIQUE_RANDOM_BYTES))
|
| - total_bytes = 0
|
| - file_contents = ''
|
| - while total_bytes < file_size:
|
| - num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES, file_size - total_bytes)
|
| - file_contents += random_bytes[:num_bytes]
|
| - total_bytes += num_bytes
|
| - self.file_contents[fpath] = file_contents
|
| - with os.fdopen(fd, 'wb') as f:
|
| - f.write(self.file_contents[fpath])
|
| - with open(fpath, 'rb') as f:
|
| - self.file_md5s[fpath] = CalculateB64EncodedMd5FromContents(f)
|
| - return fpath
|
| -
|
| - # Create files for latency tests.
|
| - for file_size in self.test_file_sizes:
|
| - fpath = _MakeFile(file_size)
|
| - self.latency_files.append(fpath)
|
| -
|
| - # Creating a file for warming up the TCP connection.
|
| - self.tcp_warmup_file = _MakeFile(5 * 1024 * 1024) # 5 Mebibytes.
|
| - # Remote file to use for TCP warmup.
|
| - self.tcp_warmup_remote_file = (str(self.bucket_url) +
|
| - os.path.basename(self.tcp_warmup_file))
|
| -
|
| - # Local file on disk for write throughput tests.
|
| - self.thru_local_file = _MakeFile(self.thru_filesize)
|
| + # Boolean to prevent doing cleanup twice.
|
| + self.teardown_completed = False
|
| +
|
| + # Create files for latency test.
|
| + if self.LAT in self.diag_tests:
|
| + self.latency_files = []
|
| + for file_size in self.test_lat_file_sizes:
|
| + fpath = self._MakeTempFile(file_size, mem_metadata=True, mem_data=True)
|
| + self.latency_files.append(fpath)
|
| +
|
| + # Create files for throughput tests.
|
| + if self.diag_tests.intersection(
|
| + (self.RTHRU, self.WTHRU, self.RTHRU_FILE, self.WTHRU_FILE)):
|
| + # Create a file for warming up the TCP connection.
|
| + self.tcp_warmup_file = self._MakeTempFile(
|
| + 5 * 1024 * 1024, mem_metadata=True, mem_data=True)
|
| +
|
| + # For in memory tests, throughput tests transfer the same object N times
|
| + # instead of creating N objects, in order to avoid excessive memory usage.
|
| + if self.diag_tests.intersection((self.RTHRU, self.WTHRU)):
|
| + self.mem_thru_file_name = self._MakeTempFile(
|
| + self.thru_filesize, mem_metadata=True, mem_data=True)
|
| + self.mem_thru_object_name = os.path.basename(self.mem_thru_file_name)
|
| +
|
| + # For tests that use disk I/O, it is necessary to create N objects in
|
| + # in order to properly measure the performance impact of seeks.
|
| + if self.diag_tests.intersection((self.RTHRU_FILE, self.WTHRU_FILE)):
|
| + # List of file names and corresponding object names to use for file
|
| + # throughput tests.
|
| + self.thru_file_names = []
|
| + self.thru_object_names = []
|
| +
|
| + free_disk_space = CheckFreeSpace(self.directory)
|
| + if free_disk_space >= self.thru_filesize * self.num_objects:
|
| + self.logger.info('\nCreating %d local files each of size %s.'
|
| + % (self.num_objects,
|
| + MakeHumanReadable(self.thru_filesize)))
|
| + self._WarnIfLargeData()
|
| + for _ in range(self.num_objects):
|
| + file_name = self._MakeTempFile(self.thru_filesize,
|
| + mem_metadata=True)
|
| + self.thru_file_names.append(file_name)
|
| + self.thru_object_names.append(os.path.basename(file_name))
|
| + else:
|
| + raise CommandException(
|
| + 'Not enough free disk space for throughput files: '
|
| + '%s of disk space required, but only %s available.'
|
| + % (MakeHumanReadable(self.thru_filesize * self.num_objects),
|
| + MakeHumanReadable(free_disk_space)))
|
|
|
| # Dummy file buffer to use for downloading that goes nowhere.
|
| self.discard_sink = DummyFile()
|
|
|
| + # Filter out misleading progress callback output and the incorrect
|
| + # suggestion to use gsutil -m perfdiag.
|
| + self.logger.addFilter(self._PerfdiagFilter())
|
| +
|
| def _TearDown(self):
|
| """Performs operations to clean things up after performing diagnostics."""
|
| - for fpath in self.latency_files + [self.thru_local_file,
|
| - self.tcp_warmup_file]:
|
| + if not self.teardown_completed:
|
| + temp_file_dict.clear()
|
| +
|
| try:
|
| - os.remove(fpath)
|
| + for fpath in self.temporary_files:
|
| + os.remove(fpath)
|
| + if self.delete_directory:
|
| + os.rmdir(self.directory)
|
| except OSError:
|
| pass
|
|
|
| - for object_name in self.test_object_names:
|
| -
|
| - def _Delete():
|
| - try:
|
| - self.gsutil_api.DeleteObject(self.bucket_url.bucket_name,
|
| - object_name,
|
| - provider=self.provider)
|
| - except NotFoundException:
|
| - pass
|
| -
|
| - self._RunOperation(_Delete)
|
| + if self.threads > 1 or self.processes > 1:
|
| + args = [obj for obj in self.temporary_objects]
|
| + self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
|
| + arg_checker=DummyArgChecker,
|
| + parallel_operations_override=True,
|
| + process_count=self.processes, thread_count=self.threads)
|
| + else:
|
| + for object_name in self.temporary_objects:
|
| + self.Delete(object_name)
|
| + self.teardown_completed = True
|
|
|
| @contextlib.contextmanager
|
| def _Time(self, key, bucket):
|
| @@ -478,12 +690,13 @@ class PerfDiagCommand(Command):
|
| # Stores timing information for each category of operation.
|
| self.results['latency'] = defaultdict(list)
|
|
|
| - for i in range(self.num_iterations):
|
| + for i in range(self.num_objects):
|
| self.logger.info('\nRunning latency iteration %d...', i+1)
|
| for fpath in self.latency_files:
|
| + file_data = temp_file_dict[fpath]
|
| url = self.bucket_url.Clone()
|
| url.object_name = os.path.basename(fpath)
|
| - file_size = self.file_sizes[fpath]
|
| + file_size = file_data.size
|
| readable_file_size = MakeHumanReadable(file_size)
|
|
|
| self.logger.info(
|
| @@ -493,7 +706,7 @@ class PerfDiagCommand(Command):
|
| upload_target = StorageUrlToUploadObjectMetadata(url)
|
|
|
| def _Upload():
|
| - io_fp = cStringIO.StringIO(self.file_contents[fpath])
|
| + io_fp = cStringIO.StringIO(file_data.data)
|
| with self._Time('UPLOAD_%d' % file_size, self.results['latency']):
|
| self.gsutil_api.UploadObject(
|
| io_fp, upload_target, size=file_size, provider=self.provider,
|
| @@ -508,8 +721,7 @@ class PerfDiagCommand(Command):
|
| 'mediaLink', 'size'])
|
| # Download will get the metadata first if we don't pass it in.
|
| download_metadata = self._RunOperation(_Metadata)
|
| - serialization_dict = GetDownloadSerializationDict(download_metadata)
|
| - serialization_data = json.dumps(serialization_dict)
|
| + serialization_data = GetDownloadSerializationData(download_metadata)
|
|
|
| def _Download():
|
| with self._Time('DOWNLOAD_%d' % file_size, self.results['latency']):
|
| @@ -524,213 +736,308 @@ class PerfDiagCommand(Command):
|
| provider=self.provider)
|
| self._RunOperation(_Delete)
|
|
|
| - class _CpFilter(logging.Filter):
|
| + class _PerfdiagFilter(logging.Filter):
|
|
|
| def filter(self, record):
|
| - # Used to prevent cp._LogCopyOperation from spewing output from
|
| - # subprocesses about every iteration.
|
| + # Used to prevent unnecessary output when using multiprocessing.
|
| msg = record.getMessage()
|
| return not (('Copying file:///' in msg) or ('Copying gs://' in msg) or
|
| - ('Computing CRC' in msg))
|
| + ('Computing CRC' in msg) or ('gsutil -m perfdiag' in msg))
|
|
|
| def _PerfdiagExceptionHandler(self, e):
|
| """Simple exception handler to allow post-completion status."""
|
| self.logger.error(str(e))
|
|
|
| - def _RunReadThruTests(self):
|
| - """Runs read throughput tests."""
|
| - self.logger.info(
|
| - '\nRunning read throughput tests (%s iterations of size %s)' %
|
| - (self.num_iterations, MakeHumanReadable(self.thru_filesize)))
|
| -
|
| - self.results['read_throughput'] = {'file_size': self.thru_filesize,
|
| - 'num_times': self.num_iterations,
|
| - 'processes': self.processes,
|
| - 'threads': self.threads}
|
| -
|
| - # Copy the TCP warmup file.
|
| - warmup_url = self.bucket_url.Clone()
|
| - warmup_url.object_name = os.path.basename(self.tcp_warmup_file)
|
| - warmup_target = StorageUrlToUploadObjectMetadata(warmup_url)
|
| - self.test_object_names.add(warmup_url.object_name)
|
| -
|
| - def _Upload1():
|
| - self.gsutil_api.UploadObject(
|
| - cStringIO.StringIO(self.file_contents[self.tcp_warmup_file]),
|
| - warmup_target, provider=self.provider, fields=['name'])
|
| - self._RunOperation(_Upload1)
|
| -
|
| - # Copy the file to remote location before reading.
|
| - thru_url = self.bucket_url.Clone()
|
| - thru_url.object_name = os.path.basename(self.thru_local_file)
|
| - thru_target = StorageUrlToUploadObjectMetadata(thru_url)
|
| - thru_target.md5Hash = self.file_md5s[self.thru_local_file]
|
| - self.test_object_names.add(thru_url.object_name)
|
| -
|
| - # Get the mediaLink here so that we can pass it to download.
|
| - def _Upload2():
|
| - return self.gsutil_api.UploadObject(
|
| - cStringIO.StringIO(self.file_contents[self.thru_local_file]),
|
| - thru_target, provider=self.provider, size=self.thru_filesize,
|
| - fields=['name', 'mediaLink', 'size'])
|
| -
|
| - # Get the metadata for the object so that we are just measuring performance
|
| - # on the actual bytes transfer.
|
| - download_metadata = self._RunOperation(_Upload2)
|
| - serialization_dict = GetDownloadSerializationDict(download_metadata)
|
| - serialization_data = json.dumps(serialization_dict)
|
| + def PerformFannedDownload(self, need_to_slice, object_names, file_names,
|
| + serialization_data):
|
| + """Performs a parallel download of multiple objects using the fan strategy.
|
|
|
| - if self.processes == 1 and self.threads == 1:
|
| + Args:
|
| + need_to_slice: If True, additionally apply the slice strategy to each
|
| + object in object_names.
|
| + object_names: A list of object names to be downloaded. Each object must
|
| + already exist in the test bucket.
|
| + file_names: A list, corresponding by index to object_names, of file names
|
| + for downloaded data. If None, discard downloaded data.
|
| + serialization_data: A list, corresponding by index to object_names,
|
| + of serialization data for each object.
|
| + """
|
| + args = []
|
| + for i in range(len(object_names)):
|
| + file_name = file_names[i] if file_names else None
|
| + args.append(FanDownloadTuple(
|
| + need_to_slice, object_names[i], file_name,
|
| + serialization_data[i]))
|
| + self.Apply(_DownloadObject, args, _PerfdiagExceptionHandler,
|
| + ('total_requests', 'request_errors'),
|
| + arg_checker=DummyArgChecker, parallel_operations_override=True,
|
| + process_count=self.processes, thread_count=self.threads)
|
| +
|
| + def PerformSlicedDownload(self, object_name, file_name, serialization_data):
|
| + """Performs a download of an object using the slice strategy.
|
|
|
| - # Warm up the TCP connection.
|
| - def _Warmup():
|
| - self.gsutil_api.GetObjectMedia(warmup_url.bucket_name,
|
| - warmup_url.object_name,
|
| - self.discard_sink,
|
| - provider=self.provider)
|
| - self._RunOperation(_Warmup)
|
| + Args:
|
| + object_name: The name of the object to download.
|
| + file_name: The name of the file to download data to, or None if data
|
| + should be discarded.
|
| + serialization_data: The serialization data for the object.
|
| + """
|
| + if file_name:
|
| + with open(file_name, 'ab') as fp:
|
| + fp.truncate(self.thru_filesize)
|
| + component_size = DivideAndCeil(self.thru_filesize, self.num_slices)
|
| + args = []
|
| + for i in range(self.num_slices):
|
| + start_byte = i * component_size
|
| + end_byte = min((i + 1) * (component_size) - 1, self.thru_filesize - 1)
|
| + args.append(SliceDownloadTuple(object_name, file_name, serialization_data,
|
| + start_byte, end_byte))
|
| + self.Apply(_DownloadSlice, args, _PerfdiagExceptionHandler,
|
| + ('total_requests', 'request_errors'),
|
| + arg_checker=DummyArgChecker, parallel_operations_override=True,
|
| + process_count=self.processes, thread_count=self.threads)
|
| +
|
| + def PerformFannedUpload(self, need_to_slice, file_names, object_names,
|
| + use_file):
|
| + """Performs a parallel upload of multiple files using the fan strategy.
|
| +
|
| + The metadata for file_name should be present in temp_file_dict prior
|
| + to calling. Also, the data for file_name should be present in temp_file_dict
|
| + if use_file is specified as False.
|
|
|
| - times = []
|
| + Args:
|
| + need_to_slice: If True, additionally apply the slice strategy to each
|
| + file in file_names.
|
| + file_names: A list of file names to be uploaded.
|
| + object_names: A list, corresponding by by index to file_names, of object
|
| + names to upload data to.
|
| + use_file: If true, use disk I/O, otherwise read upload data from memory.
|
| + """
|
| + args = []
|
| + for i in range(len(file_names)):
|
| + args.append(FanUploadTuple(
|
| + need_to_slice, file_names[i], object_names[i], use_file))
|
| + self.Apply(_UploadObject, args, _PerfdiagExceptionHandler,
|
| + ('total_requests', 'request_errors'),
|
| + arg_checker=DummyArgChecker, parallel_operations_override=True,
|
| + process_count=self.processes, thread_count=self.threads)
|
| +
|
| + def PerformSlicedUpload(self, file_name, object_name, use_file):
|
| + """Performs a parallel upload of a file using the slice strategy.
|
| +
|
| + The metadata for file_name should be present in temp_file_dict prior
|
| + to calling. Also, the data from for file_name should be present in
|
| + temp_file_dict if use_file is specified as False.
|
|
|
| - def _Download():
|
| - t0 = time.time()
|
| - self.gsutil_api.GetObjectMedia(
|
| - thru_url.bucket_name, thru_url.object_name, self.discard_sink,
|
| - provider=self.provider, serialization_data=serialization_data)
|
| - t1 = time.time()
|
| - times.append(t1 - t0)
|
| - for _ in range(self.num_iterations):
|
| - self._RunOperation(_Download)
|
| - time_took = sum(times)
|
| + Args:
|
| + file_name: The name of the file to upload.
|
| + object_name: The name of the object to upload to.
|
| + use_file: If true, use disk I/O, otherwise read upload data from memory.
|
| + """
|
| + # Divide the file into components.
|
| + component_size = DivideAndCeil(self.thru_filesize, self.num_slices)
|
| + component_object_names = (
|
| + [object_name + str(i) for i in range(self.num_slices)])
|
| +
|
| + args = []
|
| + for i in range(self.num_slices):
|
| + component_start = i * component_size
|
| + component_size = min(component_size,
|
| + temp_file_dict[file_name].size - component_start)
|
| + args.append(SliceUploadTuple(file_name, component_object_names[i],
|
| + use_file, component_start, component_size))
|
| +
|
| + # Upload the components in parallel.
|
| + try:
|
| + self.Apply(_UploadSlice, args, _PerfdiagExceptionHandler,
|
| + ('total_requests', 'request_errors'),
|
| + arg_checker=DummyArgChecker, parallel_operations_override=True,
|
| + process_count=self.processes, thread_count=self.threads)
|
| +
|
| + # Compose the components into an object.
|
| + request_components = []
|
| + for i in range(self.num_slices):
|
| + src_obj_metadata = (
|
| + apitools_messages.ComposeRequest.SourceObjectsValueListEntry(
|
| + name=component_object_names[i]))
|
| + request_components.append(src_obj_metadata)
|
| +
|
| + dst_obj_metadata = apitools_messages.Object()
|
| + dst_obj_metadata.name = object_name
|
| + dst_obj_metadata.bucket = self.bucket_url.bucket_name
|
| + def _Compose():
|
| + self.gsutil_api.ComposeObject(request_components, dst_obj_metadata,
|
| + provider=self.provider)
|
| + self._RunOperation(_Compose)
|
| + finally:
|
| + # Delete the temporary components.
|
| + self.Apply(_DeleteWrapper, component_object_names,
|
| + _PerfdiagExceptionHandler,
|
| + ('total_requests', 'request_errors'),
|
| + arg_checker=DummyArgChecker, parallel_operations_override=True,
|
| + process_count=self.processes, thread_count=self.threads)
|
| +
|
| + def _RunReadThruTests(self, use_file=False):
|
| + """Runs read throughput tests."""
|
| + test_name = 'read_throughput_file' if use_file else 'read_throughput'
|
| + file_io_string = 'with file I/O' if use_file else ''
|
| + self.logger.info(
|
| + '\nRunning read throughput tests %s (%s objects of size %s)' %
|
| + (file_io_string, self.num_objects,
|
| + MakeHumanReadable(self.thru_filesize)))
|
| + self._WarnIfLargeData()
|
| +
|
| + self.results[test_name] = {'file_size': self.thru_filesize,
|
| + 'processes': self.processes,
|
| + 'threads': self.threads,
|
| + 'parallelism': self.parallel_strategy
|
| + }
|
| +
|
| + # Copy the file(s) to the test bucket, and also get the serialization data
|
| + # so that we can pass it to download.
|
| + if use_file:
|
| + # For test with file I/O use N files on disk to preserve seek performance.
|
| + file_names = self.thru_file_names
|
| + object_names = self.thru_object_names
|
| + serialization_data = []
|
| + for i in range(self.num_objects):
|
| + self.temporary_objects.add(self.thru_object_names[i])
|
| + if self.WTHRU_FILE in self.diag_tests:
|
| + # If we ran the WTHRU_FILE test, then the objects already exist.
|
| + obj_metadata = self.gsutil_api.GetObjectMetadata(
|
| + self.bucket_url.bucket_name, self.thru_object_names[i],
|
| + fields=['size', 'mediaLink'], provider=self.bucket_url.scheme)
|
| + else:
|
| + obj_metadata = self.Upload(self.thru_file_names[i],
|
| + self.thru_object_names[i], use_file)
|
| +
|
| + # File overwrite causes performance issues with sliced downloads.
|
| + # Delete the file and reopen it for download. This matches what a real
|
| + # download would look like.
|
| + os.unlink(self.thru_file_names[i])
|
| + open(self.thru_file_names[i], 'ab').close()
|
| + serialization_data.append(GetDownloadSerializationData(obj_metadata))
|
| else:
|
| - args = ([(thru_url.bucket_name, thru_url.object_name, serialization_data)]
|
| - * self.num_iterations)
|
| - self.logger.addFilter(self._CpFilter())
|
| + # For in-memory test only use one file but copy it num_objects times, to
|
| + # allow scalability in num_objects.
|
| + self.temporary_objects.add(self.mem_thru_object_name)
|
| + obj_metadata = self.Upload(self.mem_thru_file_name,
|
| + self.mem_thru_object_name, use_file)
|
| + file_names = None
|
| + object_names = [self.mem_thru_object_name] * self.num_objects
|
| + serialization_data = (
|
| + [GetDownloadSerializationData(obj_metadata)] * self.num_objects)
|
| +
|
| + # Warmup the TCP connection.
|
| + warmup_obj_name = os.path.basename(self.tcp_warmup_file)
|
| + self.temporary_objects.add(warmup_obj_name)
|
| + self.Upload(self.tcp_warmup_file, warmup_obj_name)
|
| + self.Download(warmup_obj_name)
|
|
|
| - t0 = time.time()
|
| - self.Apply(_DownloadWrapper,
|
| - args,
|
| - _PerfdiagExceptionHandler,
|
| - arg_checker=DummyArgChecker,
|
| - parallel_operations_override=True,
|
| - process_count=self.processes,
|
| - thread_count=self.threads)
|
| - t1 = time.time()
|
| - time_took = t1 - t0
|
| + t0 = time.time()
|
| + if self.processes == 1 and self.threads == 1:
|
| + for i in range(self.num_objects):
|
| + file_name = file_names[i] if use_file else None
|
| + self.Download(object_names[i], file_name, serialization_data[i])
|
| + else:
|
| + if self.parallel_strategy in (self.FAN, self.BOTH):
|
| + need_to_slice = (self.parallel_strategy == self.BOTH)
|
| + self.PerformFannedDownload(need_to_slice, object_names, file_names,
|
| + serialization_data)
|
| + elif self.parallel_strategy == self.SLICE:
|
| + for i in range(self.num_objects):
|
| + file_name = file_names[i] if use_file else None
|
| + self.PerformSlicedDownload(
|
| + object_names[i], file_name, serialization_data[i])
|
| + t1 = time.time()
|
|
|
| - total_bytes_copied = self.thru_filesize * self.num_iterations
|
| + time_took = t1 - t0
|
| + total_bytes_copied = self.thru_filesize * self.num_objects
|
| bytes_per_second = total_bytes_copied / time_took
|
|
|
| - self.results['read_throughput']['time_took'] = time_took
|
| - self.results['read_throughput']['total_bytes_copied'] = total_bytes_copied
|
| - self.results['read_throughput']['bytes_per_second'] = bytes_per_second
|
| + self.results[test_name]['time_took'] = time_took
|
| + self.results[test_name]['total_bytes_copied'] = total_bytes_copied
|
| + self.results[test_name]['bytes_per_second'] = bytes_per_second
|
|
|
| - def _RunWriteThruTests(self):
|
| + def _RunWriteThruTests(self, use_file=False):
|
| """Runs write throughput tests."""
|
| + test_name = 'write_throughput_file' if use_file else 'write_throughput'
|
| + file_io_string = 'with file I/O' if use_file else ''
|
| self.logger.info(
|
| - '\nRunning write throughput tests (%s iterations of size %s)' %
|
| - (self.num_iterations, MakeHumanReadable(self.thru_filesize)))
|
| -
|
| - self.results['write_throughput'] = {'file_size': self.thru_filesize,
|
| - 'num_copies': self.num_iterations,
|
| - 'processes': self.processes,
|
| - 'threads': self.threads}
|
| -
|
| - warmup_url = self.bucket_url.Clone()
|
| - warmup_url.object_name = os.path.basename(self.tcp_warmup_file)
|
| - warmup_target = StorageUrlToUploadObjectMetadata(warmup_url)
|
| - self.test_object_names.add(warmup_url.object_name)
|
| -
|
| - thru_url = self.bucket_url.Clone()
|
| - thru_url.object_name = os.path.basename(self.thru_local_file)
|
| - thru_target = StorageUrlToUploadObjectMetadata(thru_url)
|
| - thru_tuples = []
|
| - for i in xrange(self.num_iterations):
|
| - # Create a unique name for each uploaded object. Otherwise,
|
| - # the XML API would fail when trying to non-atomically get metadata
|
| - # for the object that gets blown away by the overwrite.
|
| - remote_object_name = thru_target.name + str(i)
|
| - self.test_object_names.add(remote_object_name)
|
| - thru_tuples.append(UploadObjectTuple(thru_target.bucket,
|
| - remote_object_name,
|
| - filepath=self.thru_local_file))
|
| -
|
| - if self.processes == 1 and self.threads == 1:
|
| - # Warm up the TCP connection.
|
| - def _Warmup():
|
| - self.gsutil_api.UploadObject(
|
| - cStringIO.StringIO(self.file_contents[self.tcp_warmup_file]),
|
| - warmup_target, provider=self.provider, size=self.thru_filesize,
|
| - fields=['name'])
|
| - self._RunOperation(_Warmup)
|
| -
|
| - times = []
|
| -
|
| - for i in xrange(self.num_iterations):
|
| - thru_tuple = thru_tuples[i]
|
| - def _Upload():
|
| - """Uploads the write throughput measurement object."""
|
| - upload_target = apitools_messages.Object(
|
| - bucket=thru_tuple.bucket_name, name=thru_tuple.object_name,
|
| - md5Hash=thru_tuple.md5)
|
| - io_fp = cStringIO.StringIO(self.file_contents[self.thru_local_file])
|
| - t0 = time.time()
|
| - if self.thru_filesize < ResumableThreshold():
|
| - self.gsutil_api.UploadObject(
|
| - io_fp, upload_target, provider=self.provider,
|
| - size=self.thru_filesize, fields=['name'])
|
| - else:
|
| - self.gsutil_api.UploadObjectResumable(
|
| - io_fp, upload_target, provider=self.provider,
|
| - size=self.thru_filesize, fields=['name'],
|
| - tracker_callback=_DummyTrackerCallback)
|
| -
|
| - t1 = time.time()
|
| - times.append(t1 - t0)
|
| + '\nRunning write throughput tests %s (%s objects of size %s)' %
|
| + (file_io_string, self.num_objects,
|
| + MakeHumanReadable(self.thru_filesize)))
|
| + self._WarnIfLargeData()
|
| +
|
| + self.results[test_name] = {'file_size': self.thru_filesize,
|
| + 'processes': self.processes,
|
| + 'threads': self.threads,
|
| + 'parallelism': self.parallel_strategy}
|
| +
|
| + # Warmup the TCP connection.
|
| + warmup_obj_name = os.path.basename(self.tcp_warmup_file)
|
| + self.temporary_objects.add(warmup_obj_name)
|
| + self.Upload(self.tcp_warmup_file, warmup_obj_name)
|
| +
|
| + if use_file:
|
| + # For test with file I/O use N files on disk to preserve seek performance.
|
| + file_names = self.thru_file_names
|
| + object_names = self.thru_object_names
|
| + else:
|
| + # For in-memory test only use one file but copy it num_objects times, to
|
| + # allow for scalability in num_objects.
|
| + file_names = [self.mem_thru_file_name] * self.num_objects
|
| + object_names = (
|
| + [self.mem_thru_object_name + str(i) for i in range(self.num_objects)])
|
|
|
| - self._RunOperation(_Upload)
|
| - time_took = sum(times)
|
| + for object_name in object_names:
|
| + self.temporary_objects.add(object_name)
|
|
|
| + t0 = time.time()
|
| + if self.processes == 1 and self.threads == 1:
|
| + for i in range(self.num_objects):
|
| + self.Upload(file_names[i], object_names[i], use_file)
|
| else:
|
| - args = thru_tuples
|
| - t0 = time.time()
|
| - self.Apply(_UploadWrapper,
|
| - args,
|
| - _PerfdiagExceptionHandler,
|
| - arg_checker=DummyArgChecker,
|
| - parallel_operations_override=True,
|
| - process_count=self.processes,
|
| - thread_count=self.threads)
|
| - t1 = time.time()
|
| - time_took = t1 - t0
|
| + if self.parallel_strategy in (self.FAN, self.BOTH):
|
| + need_to_slice = (self.parallel_strategy == self.BOTH)
|
| + self.PerformFannedUpload(need_to_slice, file_names, object_names,
|
| + use_file)
|
| + elif self.parallel_strategy == self.SLICE:
|
| + for i in range(self.num_objects):
|
| + self.PerformSlicedUpload(file_names[i], object_names[i], use_file)
|
| + t1 = time.time()
|
|
|
| - total_bytes_copied = self.thru_filesize * self.num_iterations
|
| + time_took = t1 - t0
|
| + total_bytes_copied = self.thru_filesize * self.num_objects
|
| bytes_per_second = total_bytes_copied / time_took
|
|
|
| - self.results['write_throughput']['time_took'] = time_took
|
| - self.results['write_throughput']['total_bytes_copied'] = total_bytes_copied
|
| - self.results['write_throughput']['bytes_per_second'] = bytes_per_second
|
| + self.results[test_name]['time_took'] = time_took
|
| + self.results[test_name]['total_bytes_copied'] = total_bytes_copied
|
| + self.results[test_name]['bytes_per_second'] = bytes_per_second
|
|
|
| def _RunListTests(self):
|
| """Runs eventual consistency listing latency tests."""
|
| - self.results['listing'] = {'num_files': self.num_iterations}
|
| + self.results['listing'] = {'num_files': self.num_objects}
|
|
|
| - # Generate N random object names to put in the bucket.
|
| + # Generate N random objects to put into the bucket.
|
| list_prefix = 'gsutil-perfdiag-list-'
|
| + list_fpaths = []
|
| list_objects = []
|
| - for _ in xrange(self.num_iterations):
|
| - list_object_name = u'%s%s' % (list_prefix, os.urandom(20).encode('hex'))
|
| - self.test_object_names.add(list_object_name)
|
| - list_objects.append(list_object_name)
|
| + args = []
|
| + for _ in xrange(self.num_objects):
|
| + fpath = self._MakeTempFile(0, mem_data=True, mem_metadata=True,
|
| + prefix=list_prefix)
|
| + list_fpaths.append(fpath)
|
| + object_name = os.path.basename(fpath)
|
| + list_objects.append(object_name)
|
| + args.append(FanUploadTuple(False, fpath, object_name, False))
|
| + self.temporary_objects.add(object_name)
|
|
|
| # Add the objects to the bucket.
|
| self.logger.info(
|
| - '\nWriting %s objects for listing test...', self.num_iterations)
|
| - empty_md5 = CalculateB64EncodedMd5FromContents(cStringIO.StringIO(''))
|
| - args = [
|
| - UploadObjectTuple(self.bucket_url.bucket_name, name, md5=empty_md5,
|
| - contents='') for name in list_objects]
|
| - self.Apply(_UploadWrapper, args, _PerfdiagExceptionHandler,
|
| + '\nWriting %s objects for listing test...', self.num_objects)
|
| +
|
| + self.Apply(_UploadObject, args, _PerfdiagExceptionHandler,
|
| arg_checker=DummyArgChecker)
|
|
|
| list_latencies = []
|
| @@ -743,7 +1050,7 @@ class PerfDiagCommand(Command):
|
| """Lists and returns objects in the bucket. Also records latency."""
|
| t0 = time.time()
|
| objects = list(self.gsutil_api.ListObjects(
|
| - self.bucket_url.bucket_name, prefix=list_prefix, delimiter='/',
|
| + self.bucket_url.bucket_name, delimiter='/',
|
| provider=self.provider, fields=['items/name']))
|
| t1 = time.time()
|
| list_latencies.append(t1 - t0)
|
| @@ -751,7 +1058,7 @@ class PerfDiagCommand(Command):
|
|
|
| self.logger.info(
|
| 'Listing bucket %s waiting for %s objects to appear...',
|
| - self.bucket_url.bucket_name, self.num_iterations)
|
| + self.bucket_url.bucket_name, self.num_objects)
|
| while expected_objects - found_objects:
|
| def _ListAfterUpload():
|
| names = _List()
|
| @@ -771,14 +1078,15 @@ class PerfDiagCommand(Command):
|
| 'time_took': total_end_time - total_start_time,
|
| }
|
|
|
| + args = [object_name for object_name in list_objects]
|
| self.logger.info(
|
| - 'Deleting %s objects for listing test...', self.num_iterations)
|
| + 'Deleting %s objects for listing test...', self.num_objects)
|
| self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
|
| arg_checker=DummyArgChecker)
|
|
|
| self.logger.info(
|
| 'Listing bucket %s waiting for %s objects to disappear...',
|
| - self.bucket_url.bucket_name, self.num_iterations)
|
| + self.bucket_url.bucket_name, self.num_objects)
|
| list_latencies = []
|
| files_seen = []
|
| total_start_time = time.time()
|
| @@ -802,45 +1110,102 @@ class PerfDiagCommand(Command):
|
| 'time_took': total_end_time - total_start_time,
|
| }
|
|
|
| - def Upload(self, thru_tuple, thread_state=None):
|
| - gsutil_api = GetCloudApiInstance(self, thread_state)
|
| -
|
| - md5hash = thru_tuple.md5
|
| - contents = thru_tuple.contents
|
| - if thru_tuple.filepath:
|
| - md5hash = self.file_md5s[thru_tuple.filepath]
|
| - contents = self.file_contents[thru_tuple.filepath]
|
| -
|
| - upload_target = apitools_messages.Object(
|
| - bucket=thru_tuple.bucket_name, name=thru_tuple.object_name,
|
| - md5Hash=md5hash)
|
| - file_size = len(contents)
|
| - if file_size < ResumableThreshold():
|
| - gsutil_api.UploadObject(
|
| - cStringIO.StringIO(contents), upload_target,
|
| - provider=self.provider, size=file_size, fields=['name'])
|
| - else:
|
| - gsutil_api.UploadObjectResumable(
|
| - cStringIO.StringIO(contents), upload_target,
|
| - provider=self.provider, size=file_size, fields=['name'],
|
| - tracker_callback=_DummyTrackerCallback)
|
| + def Upload(self, file_name, object_name, use_file=False, file_start=0,
|
| + file_size=None):
|
| + """Performs an upload to the test bucket.
|
|
|
| - def Download(self, download_tuple, thread_state=None):
|
| - """Downloads a file.
|
| + The file is uploaded to the bucket referred to by self.bucket_url, and has
|
| + name object_name.
|
|
|
| Args:
|
| - download_tuple: (bucket name, object name, serialization data for object).
|
| - thread_state: gsutil Cloud API instance to use for the download.
|
| + file_name: The path to the local file, and the key to its entry in
|
| + temp_file_dict.
|
| + object_name: The name of the remote object.
|
| + use_file: If true, use disk I/O, otherwise read everything from memory.
|
| + file_start: The first byte in the file to upload to the object.
|
| + (only should be specified for sliced uploads)
|
| + file_size: The size of the file to upload.
|
| + (only should be specified for sliced uploads)
|
| +
|
| + Returns:
|
| + Uploaded Object Metadata.
|
| """
|
| - gsutil_api = GetCloudApiInstance(self, thread_state)
|
| - gsutil_api.GetObjectMedia(
|
| - download_tuple[0], download_tuple[1], self.discard_sink,
|
| - provider=self.provider, serialization_data=download_tuple[2])
|
| + fp = None
|
| + if file_size is None:
|
| + file_size = temp_file_dict[file_name].size
|
|
|
| - def Delete(self, thru_tuple, thread_state=None):
|
| - gsutil_api = thread_state or self.gsutil_api
|
| - gsutil_api.DeleteObject(
|
| - thru_tuple.bucket_name, thru_tuple.object_name, provider=self.provider)
|
| + upload_url = self.bucket_url.Clone()
|
| + upload_url.object_name = object_name
|
| + upload_target = StorageUrlToUploadObjectMetadata(upload_url)
|
| +
|
| + try:
|
| + if use_file:
|
| + fp = FilePart(file_name, file_start, file_size)
|
| + else:
|
| + data = temp_file_dict[file_name].data[file_start:file_start+file_size]
|
| + fp = cStringIO.StringIO(data)
|
| +
|
| + def _InnerUpload():
|
| + if file_size < ResumableThreshold():
|
| + return self.gsutil_api.UploadObject(
|
| + fp, upload_target, provider=self.provider, size=file_size,
|
| + fields=['name', 'mediaLink', 'size'])
|
| + else:
|
| + return self.gsutil_api.UploadObjectResumable(
|
| + fp, upload_target, provider=self.provider, size=file_size,
|
| + fields=['name', 'mediaLink', 'size'],
|
| + tracker_callback=_DummyTrackerCallback)
|
| + return self._RunOperation(_InnerUpload)
|
| + finally:
|
| + if fp:
|
| + fp.close()
|
| +
|
| + def Download(self, object_name, file_name=None, serialization_data=None,
|
| + start_byte=0, end_byte=None):
|
| + """Downloads an object from the test bucket.
|
| +
|
| + Args:
|
| + object_name: The name of the object (in the test bucket) to download.
|
| + file_name: Optional file name to write downloaded data to. If None,
|
| + downloaded data is discarded immediately.
|
| + serialization_data: Optional serialization data, used so that we don't
|
| + have to get the metadata before downloading.
|
| + start_byte: The first byte in the object to download.
|
| + (only should be specified for sliced downloads)
|
| + end_byte: The last byte in the object to download.
|
| + (only should be specified for sliced downloads)
|
| + """
|
| + fp = None
|
| + try:
|
| + if file_name is not None:
|
| + fp = open(file_name, 'r+b')
|
| + fp.seek(start_byte)
|
| + else:
|
| + fp = self.discard_sink
|
| +
|
| + def _InnerDownload():
|
| + self.gsutil_api.GetObjectMedia(
|
| + self.bucket_url.bucket_name, object_name, fp,
|
| + provider=self.provider, start_byte=start_byte, end_byte=end_byte,
|
| + serialization_data=serialization_data)
|
| + self._RunOperation(_InnerDownload)
|
| + finally:
|
| + if fp:
|
| + fp.close()
|
| +
|
| + def Delete(self, object_name):
|
| + """Deletes an object from the test bucket.
|
| +
|
| + Args:
|
| + object_name: The name of the object to delete.
|
| + """
|
| + try:
|
| + def _InnerDelete():
|
| + self.gsutil_api.DeleteObject(self.bucket_url.bucket_name, object_name,
|
| + provider=self.provider)
|
| + self._RunOperation(_InnerDelete)
|
| + except NotFoundException:
|
| + pass
|
|
|
| def _GetDiskCounters(self):
|
| """Retrieves disk I/O statistics for all disks.
|
| @@ -966,7 +1331,7 @@ class PerfDiagCommand(Command):
|
| sysinfo['ip_address'] = ''
|
| # Record the temporary directory used since it can affect performance, e.g.
|
| # when on a networked filesystem.
|
| - sysinfo['tempdir'] = tempfile.gettempdir()
|
| + sysinfo['tempdir'] = self.directory
|
|
|
| # Produces an RFC 2822 compliant GMT timestamp.
|
| sysinfo['gmt_timestamp'] = time.strftime('%a, %d %b %Y %H:%M:%S +0000',
|
| @@ -1178,12 +1543,27 @@ class PerfDiagCommand(Command):
|
| print 'Write Throughput'.center(78)
|
| print '-' * 78
|
| write_thru = self.results['write_throughput']
|
| - print 'Copied a %s file %d times for a total transfer size of %s.' % (
|
| + print 'Copied %s %s file(s) for a total transfer size of %s.' % (
|
| + self.num_objects,
|
| MakeHumanReadable(write_thru['file_size']),
|
| - write_thru['num_copies'],
|
| MakeHumanReadable(write_thru['total_bytes_copied']))
|
| print 'Write throughput: %s/s.' % (
|
| MakeBitsHumanReadable(write_thru['bytes_per_second'] * 8))
|
| + print 'Parallelism strategy: %s' % write_thru['parallelism']
|
| +
|
| + if 'write_throughput_file' in self.results:
|
| + print
|
| + print '-' * 78
|
| + print 'Write Throughput With File I/O'.center(78)
|
| + print '-' * 78
|
| + write_thru_file = self.results['write_throughput_file']
|
| + print 'Copied %s %s file(s) for a total transfer size of %s.' % (
|
| + self.num_objects,
|
| + MakeHumanReadable(write_thru_file['file_size']),
|
| + MakeHumanReadable(write_thru_file['total_bytes_copied']))
|
| + print 'Write throughput: %s/s.' % (
|
| + MakeBitsHumanReadable(write_thru_file['bytes_per_second'] * 8))
|
| + print 'Parallelism strategy: %s' % write_thru_file['parallelism']
|
|
|
| if 'read_throughput' in self.results:
|
| print
|
| @@ -1191,12 +1571,27 @@ class PerfDiagCommand(Command):
|
| print 'Read Throughput'.center(78)
|
| print '-' * 78
|
| read_thru = self.results['read_throughput']
|
| - print 'Copied a %s file %d times for a total transfer size of %s.' % (
|
| + print 'Copied %s %s file(s) for a total transfer size of %s.' % (
|
| + self.num_objects,
|
| MakeHumanReadable(read_thru['file_size']),
|
| - read_thru['num_times'],
|
| MakeHumanReadable(read_thru['total_bytes_copied']))
|
| print 'Read throughput: %s/s.' % (
|
| MakeBitsHumanReadable(read_thru['bytes_per_second'] * 8))
|
| + print 'Parallelism strategy: %s' % read_thru['parallelism']
|
| +
|
| + if 'read_throughput_file' in self.results:
|
| + print
|
| + print '-' * 78
|
| + print 'Read Throughput With File I/O'.center(78)
|
| + print '-' * 78
|
| + read_thru_file = self.results['read_throughput_file']
|
| + print 'Copied %s %s file(s) for a total transfer size of %s.' % (
|
| + self.num_objects,
|
| + MakeHumanReadable(read_thru_file['file_size']),
|
| + MakeHumanReadable(read_thru_file['total_bytes_copied']))
|
| + print 'Read throughput: %s/s.' % (
|
| + MakeBitsHumanReadable(read_thru_file['bytes_per_second'] * 8))
|
| + print 'Parallelism strategy: %s' % read_thru_file['parallelism']
|
|
|
| if 'listing' in self.results:
|
| print
|
| @@ -1389,15 +1784,23 @@ class PerfDiagCommand(Command):
|
| def _ParseArgs(self):
|
| """Parses arguments for perfdiag command."""
|
| # From -n.
|
| - self.num_iterations = 5
|
| + self.num_objects = 5
|
| # From -c.
|
| self.processes = 1
|
| # From -k.
|
| self.threads = 1
|
| + # From -p
|
| + self.parallel_strategy = None
|
| + # From -y
|
| + self.num_slices = 4
|
| # From -s.
|
| self.thru_filesize = 1048576
|
| + # From -d.
|
| + self.directory = tempfile.gettempdir()
|
| + # Keep track of whether or not to delete the directory upon completion.
|
| + self.delete_directory = False
|
| # From -t.
|
| - self.diag_tests = self.DEFAULT_DIAG_TESTS
|
| + self.diag_tests = set(self.DEFAULT_DIAG_TESTS)
|
| # From -o.
|
| self.output_file = None
|
| # From -i.
|
| @@ -1408,7 +1811,7 @@ class PerfDiagCommand(Command):
|
| if self.sub_opts:
|
| for o, a in self.sub_opts:
|
| if o == '-n':
|
| - self.num_iterations = self._ParsePositiveInteger(
|
| + self.num_objects = self._ParsePositiveInteger(
|
| a, 'The -n parameter must be a positive integer.')
|
| if o == '-c':
|
| self.processes = self._ParsePositiveInteger(
|
| @@ -1416,21 +1819,32 @@ class PerfDiagCommand(Command):
|
| if o == '-k':
|
| self.threads = self._ParsePositiveInteger(
|
| a, 'The -k parameter must be a positive integer.')
|
| + if o == '-p':
|
| + if a.lower() in self.PARALLEL_STRATEGIES:
|
| + self.parallel_strategy = a.lower()
|
| + else:
|
| + raise CommandException(
|
| + "'%s' is not a valid parallelism strategy." % a)
|
| + if o == '-y':
|
| + self.num_slices = self._ParsePositiveInteger(
|
| + a, 'The -y parameter must be a positive integer.')
|
| if o == '-s':
|
| try:
|
| self.thru_filesize = HumanReadableToBytes(a)
|
| except ValueError:
|
| raise CommandException('Invalid -s parameter.')
|
| - if self.thru_filesize > (20 * 1024 ** 3): # Max 20 GiB.
|
| - raise CommandException(
|
| - 'Maximum throughput file size parameter (-s) is 20 GiB.')
|
| + if o == '-d':
|
| + self.directory = a
|
| + if not os.path.exists(self.directory):
|
| + self.delete_directory = True
|
| + os.makedirs(self.directory)
|
| if o == '-t':
|
| - self.diag_tests = []
|
| + self.diag_tests = set()
|
| for test_name in a.strip().split(','):
|
| if test_name.lower() not in self.ALL_DIAG_TESTS:
|
| raise CommandException("List of test names (-t) contains invalid "
|
| "test name '%s'." % test_name)
|
| - self.diag_tests.append(test_name)
|
| + self.diag_tests.add(test_name)
|
| if o == '-m':
|
| pieces = a.split(':')
|
| if len(pieces) != 2:
|
| @@ -1452,15 +1866,39 @@ class PerfDiagCommand(Command):
|
| raise CommandException("Could not decode input file (-i): '%s'." %
|
| a)
|
| return
|
| +
|
| + # If parallelism is specified, default parallelism strategy to fan.
|
| + if (self.processes > 1 or self.threads > 1) and not self.parallel_strategy:
|
| + self.parallel_strategy = self.FAN
|
| + elif self.processes == 1 and self.threads == 1 and self.parallel_strategy:
|
| + raise CommandException(
|
| + 'Cannot specify parallelism strategy (-p) without also specifying '
|
| + 'multiple threads and/or processes (-c and/or -k).')
|
| +
|
| if not self.args:
|
| self.RaiseWrongNumberOfArgumentsException()
|
|
|
| self.bucket_url = StorageUrlFromString(self.args[0])
|
| self.provider = self.bucket_url.scheme
|
| - if not (self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket()):
|
| + if not self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket():
|
| raise CommandException('The perfdiag command requires a URL that '
|
| 'specifies a bucket.\n"%s" is not '
|
| 'valid.' % self.args[0])
|
| +
|
| + if (self.thru_filesize > HumanReadableToBytes('2GiB') and
|
| + (self.RTHRU in self.diag_tests or self.WTHRU in self.diag_tests)):
|
| + raise CommandException(
|
| + 'For in-memory tests maximum file size is 2GiB. For larger file '
|
| + 'sizes, specify rthru_file and/or wthru_file with the -t option.')
|
| +
|
| + perform_slice = self.parallel_strategy in (self.SLICE, self.BOTH)
|
| + slice_not_available = (
|
| + self.provider == 's3' and self.diag_tests.intersection(self.WTHRU,
|
| + self.WTHRU_FILE))
|
| + if perform_slice and slice_not_available:
|
| + raise CommandException('Sliced uploads are not available for s3. '
|
| + 'Use -p fan or sequential uploads for s3.')
|
| +
|
| # Ensure the bucket exists.
|
| self.gsutil_api.GetBucket(self.bucket_url.bucket_name,
|
| provider=self.bucket_url.scheme,
|
| @@ -1487,12 +1925,14 @@ class PerfDiagCommand(Command):
|
| 'Base bucket URI: %s\n'
|
| 'Number of processes: %d\n'
|
| 'Number of threads: %d\n'
|
| + 'Parallelism strategy: %s\n'
|
| 'Throughput file size: %s\n'
|
| 'Diagnostics to run: %s',
|
| - self.num_iterations,
|
| + self.num_objects,
|
| self.bucket_url,
|
| self.processes,
|
| self.threads,
|
| + self.parallel_strategy,
|
| MakeHumanReadable(self.thru_filesize),
|
| (', '.join(self.diag_tests)))
|
|
|
| @@ -1516,6 +1956,12 @@ class PerfDiagCommand(Command):
|
| self._RunLatencyTests()
|
| if self.RTHRU in self.diag_tests:
|
| self._RunReadThruTests()
|
| + # Run WTHRU_FILE before RTHRU_FILE. If data is created in WTHRU_FILE it
|
| + # will be used in RTHRU_FILE to save time and bandwidth.
|
| + if self.WTHRU_FILE in self.diag_tests:
|
| + self._RunWriteThruTests(use_file=True)
|
| + if self.RTHRU_FILE in self.diag_tests:
|
| + self._RunReadThruTests(use_file=True)
|
| if self.WTHRU in self.diag_tests:
|
| self._RunWriteThruTests()
|
| if self.LIST in self.diag_tests:
|
| @@ -1535,6 +1981,7 @@ class PerfDiagCommand(Command):
|
| self.results['gsutil_version'] = gslib.VERSION
|
| self.results['boto_version'] = boto.__version__
|
|
|
| + self._TearDown()
|
| self._DisplayResults()
|
| finally:
|
| # TODO: Install signal handlers so this is performed in response to a
|
| @@ -1545,40 +1992,6 @@ class PerfDiagCommand(Command):
|
| return 0
|
|
|
|
|
| -class UploadObjectTuple(object):
|
| - """Picklable tuple with necessary metadata for an insert object call."""
|
| -
|
| - def __init__(self, bucket_name, object_name, filepath=None, md5=None,
|
| - contents=None):
|
| - """Create an upload tuple.
|
| -
|
| - Args:
|
| - bucket_name: Name of the bucket to upload to.
|
| - object_name: Name of the object to upload to.
|
| - filepath: A file path located in self.file_contents and self.file_md5s.
|
| - md5: The MD5 hash of the object being uploaded.
|
| - contents: The contents of the file to be uploaded.
|
| -
|
| - Note: (contents + md5) and filepath are mutually exlusive. You may specify
|
| - one or the other, but not both.
|
| - Note: If one of contents or md5 are specified, they must both be specified.
|
| -
|
| - Raises:
|
| - InvalidArgument: if the arguments are invalid.
|
| - """
|
| - self.bucket_name = bucket_name
|
| - self.object_name = object_name
|
| - self.filepath = filepath
|
| - self.md5 = md5
|
| - self.contents = contents
|
| - if filepath and (md5 or contents is not None):
|
| - raise InvalidArgument(
|
| - 'Only one of filepath or (md5 + contents) may be specified.')
|
| - if not filepath and (not md5 or contents is None):
|
| - raise InvalidArgument(
|
| - 'Both md5 and contents must be specified.')
|
| -
|
| -
|
| def StorageUrlToUploadObjectMetadata(storage_url):
|
| if storage_url.IsCloudUrl() and storage_url.IsObject():
|
| upload_target = apitools_messages.Object()
|
|
|