Index: tools/telemetry/third_party/gsutilz/gslib/commands/perfdiag.py
|
diff --git a/tools/telemetry/third_party/gsutilz/gslib/commands/perfdiag.py b/tools/telemetry/third_party/gsutilz/gslib/commands/perfdiag.py
|
index d88eae78c1242e823b9be169e8f2f9f103a15a23..f95545bdd85537ef30ed7db418ed6a1760492bc4 100644
|
--- a/tools/telemetry/third_party/gsutilz/gslib/commands/perfdiag.py
|
+++ b/tools/telemetry/third_party/gsutilz/gslib/commands/perfdiag.py
|
@@ -18,6 +18,7 @@ from __future__ import absolute_import
|
|
import calendar
|
from collections import defaultdict
|
+from collections import namedtuple
|
import contextlib
|
import cStringIO
|
import datetime
|
@@ -41,17 +42,21 @@ import boto.gs.connection
|
import gslib
|
from gslib.cloud_api import NotFoundException
|
from gslib.cloud_api import ServiceException
|
-from gslib.cloud_api_helper import GetDownloadSerializationDict
|
+from gslib.cloud_api_helper import GetDownloadSerializationData
|
from gslib.command import Command
|
from gslib.command import DummyArgChecker
|
from gslib.command_argument import CommandArgument
|
from gslib.commands import config
|
from gslib.cs_api_map import ApiSelector
|
from gslib.exception import CommandException
|
+from gslib.file_part import FilePart
|
from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
|
from gslib.storage_url import StorageUrlFromString
|
from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
|
+from gslib.util import CheckFreeSpace
|
+from gslib.util import DivideAndCeil
|
from gslib.util import GetCloudApiInstance
|
+from gslib.util import GetFileSize
|
from gslib.util import GetMaxRetryDelay
|
from gslib.util import HumanReadableToBytes
|
from gslib.util import IS_LINUX
|
@@ -60,11 +65,11 @@ from gslib.util import MakeHumanReadable
|
from gslib.util import Percentile
|
from gslib.util import ResumableThreshold
|
|
-
|
_SYNOPSIS = """
|
gsutil perfdiag [-i in.json]
|
- gsutil perfdiag [-o out.json] [-n iterations] [-c processes]
|
- [-k threads] [-s size] [-t tests] url...
|
+ gsutil perfdiag [-o out.json] [-n objects] [-c processes]
|
+ [-k threads] [-p parallelism type] [-y slices] [-s size] [-d directory]
|
+ [-t tests] url...
|
"""
|
|
_DETAILED_HELP_TEXT = ("""
|
@@ -99,9 +104,8 @@ _DETAILED_HELP_TEXT = ("""
|
|
|
<B>OPTIONS</B>
|
- -n Sets the number of iterations performed when downloading and
|
- uploading files during latency and throughput tests. Defaults to
|
- 5.
|
+ -n Sets the number of objects to use when downloading and uploading
|
+ files during tests. Defaults to 5.
|
|
-c Sets the number of processes to use while running throughput
|
experiments. The default value is 1.
|
@@ -110,20 +114,55 @@ _DETAILED_HELP_TEXT = ("""
|
throughput experiments. Each process will receive an equal number
|
of threads. The default value is 1.
|
|
- -s Sets the size (in bytes) of the test file used to perform read
|
- and write throughput tests. The default is 1 MiB. This can also
|
- be specified using byte suffixes such as 500K or 1M. Note: these
|
- values are interpreted as multiples of 1024 (K=1024, M=1024*1024,
|
- etc.)
|
+ Note: All specified threads and processes will be created, but may
|
+ not by saturated with work if too few objects (specified with -n)
|
+ and too few components (specified with -y) are specified.
|
+
|
+ -p Sets the type of parallelism to be used (only applicable when
|
+ threads or processes are specified and threads * processes > 1).
|
+ The default is to use fan. Must be one of the following:
|
+
|
+ fan
|
+ Use one thread per object. This is akin to using gsutil -m cp,
|
+ with sliced object download / parallel composite upload
|
+ disabled.
|
+
|
+ slice
|
+ Use Y (specified with -y) threads for each object, transferring
|
+ one object at a time. This is akin to using parallel object
|
+ download / parallel composite upload, without -m. Sliced
|
+ uploads not supported for s3.
|
+
|
+ both
|
+ Use Y (specified with -y) threads for each object, transferring
|
+ multiple objects at a time. This is akin to simultaneously
|
+ using sliced object download / parallel composite upload and
|
+ gsutil -m cp. Sliced uploads not supported for s3.
|
+
|
+ -y Sets the number of slices to divide each file/object into while
|
+ transferring data. Only applicable with the slice (or both)
|
+ parallelism type. The default is 4 slices.
|
+
|
+ -s Sets the size (in bytes) for each of the N (set with -n) objects
|
+ used in the read and write throughput tests. The default is 1 MiB.
|
+ This can also be specified using byte suffixes such as 500K or 1M.
|
+ Note: these values are interpreted as multiples of 1024 (K=1024,
|
+ M=1024*1024, etc.)
|
+ Note: If rthru_file or wthru_file are performed, N (set with -n)
|
+ times as much disk space as specified will be required for the
|
+ operation.
|
+
|
+ -d Sets the directory to store temporary local files in. If not
|
+ specified, a default temporary directory will be used.
|
|
-t Sets the list of diagnostic tests to perform. The default is to
|
- run all diagnostic tests. Must be a comma-separated list
|
- containing one or more of the following:
|
+ run the lat, rthru, and wthru diagnostic tests. Must be a
|
+ comma-separated list containing one or more of the following:
|
|
lat
|
- Runs N iterations (set with -n) of writing the file,
|
- retrieving its metadata, reading the file, and deleting
|
- the file. Records the latency of each operation.
|
+ For N (set with -n) objects, write the object, retrieve its
|
+ metadata, read the object, and finally delete the object.
|
+ Record the latency of each operation.
|
|
list
|
Write N (set with -n) objects to the bucket, record how long
|
@@ -136,15 +175,22 @@ _DETAILED_HELP_TEXT = ("""
|
Runs N (set with -n) read operations, with at most C
|
(set with -c) reads outstanding at any given time.
|
|
+ rthru_file
|
+ The same as rthru, but simultaneously writes data to the disk,
|
+ to gauge the performance impact of the local disk on downloads.
|
+
|
wthru
|
Runs N (set with -n) write operations, with at most C
|
(set with -c) writes outstanding at any given time.
|
|
+ wthru_file
|
+ The same as wthru, but simultaneously reads data from the disk,
|
+ to gauge the performance impact of the local disk on uploads.
|
+
|
-m Adds metadata to the result JSON file. Multiple -m values can be
|
specified. Example:
|
|
- gsutil perfdiag -m "key1:value1" -m "key2:value2" \
|
- gs://bucketname/
|
+ gsutil perfdiag -m "key1:val1" -m "key2:val2" gs://bucketname
|
|
Each metadata key will be added to the top-level "metadata"
|
dictionary in the output JSON file.
|
@@ -178,6 +224,41 @@ _DETAILED_HELP_TEXT = ("""
|
this information will be sent to Google unless you choose to send it.
|
""")
|
|
+FileDataTuple = namedtuple(
|
+ 'FileDataTuple',
|
+ 'size md5 data')
|
+
|
+# Describes one object in a fanned download. If need_to_slice is specified as
|
+# True, the object should be downloaded with the slice strategy. Other field
|
+# names are the same as documented in PerfDiagCommand.Download.
|
+FanDownloadTuple = namedtuple(
|
+ 'FanDownloadTuple',
|
+ 'need_to_slice object_name file_name serialization_data')
|
+
|
+# Describes one slice in a sliced download.
|
+# Field names are the same as documented in PerfDiagCommand.Download.
|
+SliceDownloadTuple = namedtuple(
|
+ 'SliceDownloadTuple',
|
+ 'object_name file_name serialization_data start_byte end_byte')
|
+
|
+# Describes one file in a fanned upload. If need_to_slice is specified as
|
+# True, the file should be uploaded with the slice strategy. Other field
|
+# names are the same as documented in PerfDiagCommand.Upload.
|
+FanUploadTuple = namedtuple(
|
+ 'FanUploadTuple',
|
+ 'need_to_slice file_name object_name use_file')
|
+
|
+# Describes one slice in a sliced upload.
|
+# Field names are the same as documented in PerfDiagCommand.Upload.
|
+SliceUploadTuple = namedtuple(
|
+ 'SliceUploadTuple',
|
+ 'file_name object_name use_file file_start file_size')
|
+
|
+# Dict storing file_path:FileDataTuple for each temporary file used by
|
+# perfdiag. This data should be kept outside of the PerfDiagCommand class
|
+# since calls to Apply will make copies of all member data.
|
+temp_file_dict = {}
|
+
|
|
class Error(Exception):
|
"""Base exception class for this module."""
|
@@ -189,16 +270,73 @@ class InvalidArgument(Error):
|
pass
|
|
|
-def _DownloadWrapper(cls, arg, thread_state=None):
|
- cls.Download(arg, thread_state=thread_state)
|
+def _DownloadObject(cls, args, thread_state=None):
|
+ """Function argument to apply for performing fanned parallel downloads.
|
|
+ Args:
|
+ cls: The calling PerfDiagCommand class instance.
|
+ args: A FanDownloadTuple object describing this download.
|
+ thread_state: gsutil Cloud API instance to use for the operation.
|
+ """
|
+ cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
|
+ if args.need_to_slice:
|
+ cls.PerformSlicedDownload(args.object_name, args.file_name,
|
+ args.serialization_data)
|
+ else:
|
+ cls.Download(args.object_name, args.file_name, args.serialization_data)
|
|
-def _UploadWrapper(cls, arg, thread_state=None):
|
- cls.Upload(arg, thread_state=thread_state)
|
|
+def _DownloadSlice(cls, args, thread_state=None):
|
+ """Function argument to apply for performing sliced downloads.
|
|
-def _DeleteWrapper(cls, arg, thread_state=None):
|
- cls.Delete(arg, thread_state=thread_state)
|
+ Args:
|
+ cls: The calling PerfDiagCommand class instance.
|
+ args: A SliceDownloadTuple object describing this download.
|
+ thread_state: gsutil Cloud API instance to use for the operation.
|
+ """
|
+ cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
|
+ cls.Download(args.object_name, args.file_name, args.serialization_data,
|
+ args.start_byte, args.end_byte)
|
+
|
+
|
+def _UploadObject(cls, args, thread_state=None):
|
+ """Function argument to apply for performing fanned parallel uploads.
|
+
|
+ Args:
|
+ cls: The calling PerfDiagCommand class instance.
|
+ args: A FanUploadTuple object describing this upload.
|
+ thread_state: gsutil Cloud API instance to use for the operation.
|
+ """
|
+ cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
|
+ if args.need_to_slice:
|
+ cls.PerformSlicedUpload(args.file_name, args.object_name, args.use_file)
|
+ else:
|
+ cls.Upload(args.file_name, args.object_name, args.use_file)
|
+
|
+
|
+def _UploadSlice(cls, args, thread_state=None):
|
+ """Function argument to apply for performing sliced parallel uploads.
|
+
|
+ Args:
|
+ cls: The calling PerfDiagCommand class instance.
|
+ args: A SliceUploadTuple object describing this upload.
|
+ thread_state: gsutil Cloud API instance to use for the operation.
|
+ """
|
+ cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
|
+ cls.Upload(args.file_name, args.object_name, args.use_file,
|
+ args.file_start, args.file_size)
|
+
|
+
|
+def _DeleteWrapper(cls, object_name, thread_state=None):
|
+ """Function argument to apply for performing parallel object deletions.
|
+
|
+ Args:
|
+ cls: The calling PerfDiagCommand class instance.
|
+ object_name: The object name to delete from the test bucket.
|
+ thread_state: gsutil Cloud API instance to use for the operation.
|
+ """
|
+ cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
|
+ cls.Delete(object_name)
|
|
|
def _PerfdiagExceptionHandler(cls, e):
|
@@ -216,6 +354,9 @@ class DummyFile(object):
|
def write(self, *args, **kwargs): # pylint: disable=invalid-name
|
pass
|
|
+ def close(self): # pylint: disable=invalid-name
|
+ pass
|
+
|
|
# Many functions in perfdiag re-define a temporary function based on a
|
# variable from a loop, resulting in a false positive from the linter.
|
@@ -230,7 +371,7 @@ class PerfDiagCommand(Command):
|
usage_synopsis=_SYNOPSIS,
|
min_args=0,
|
max_args=1,
|
- supported_sub_args='n:c:k:s:t:m:i:o:',
|
+ supported_sub_args='n:c:k:p:y:s:d:t:m:i:o:',
|
file_url_ok=False,
|
provider_url_ok=False,
|
urls_start_arg=0,
|
@@ -253,7 +394,7 @@ class PerfDiagCommand(Command):
|
# Byte sizes to use for latency testing files.
|
# TODO: Consider letting the user specify these sizes with a configuration
|
# parameter.
|
- test_file_sizes = (
|
+ test_lat_file_sizes = (
|
0, # 0 bytes
|
1024, # 1 KiB
|
102400, # 100 KiB
|
@@ -262,15 +403,26 @@ class PerfDiagCommand(Command):
|
|
# Test names.
|
RTHRU = 'rthru'
|
+ RTHRU_FILE = 'rthru_file'
|
WTHRU = 'wthru'
|
+ WTHRU_FILE = 'wthru_file'
|
LAT = 'lat'
|
LIST = 'list'
|
|
+ # Parallelism strategies.
|
+ FAN = 'fan'
|
+ SLICE = 'slice'
|
+ BOTH = 'both'
|
+
|
# List of all diagnostic tests.
|
- ALL_DIAG_TESTS = (RTHRU, WTHRU, LAT, LIST)
|
+ ALL_DIAG_TESTS = (RTHRU, RTHRU_FILE, WTHRU, WTHRU_FILE, LAT, LIST)
|
+
|
# List of diagnostic tests to run by default.
|
DEFAULT_DIAG_TESTS = (RTHRU, WTHRU, LAT)
|
|
+ # List of parallelism strategies.
|
+ PARALLEL_STRATEGIES = (FAN, SLICE, BOTH)
|
+
|
# Google Cloud Storage XML API endpoint host.
|
XML_API_HOST = boto.config.get(
|
'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost)
|
@@ -324,21 +476,58 @@ class PerfDiagCommand(Command):
|
"subprocess '%s'." % (p.returncode, ' '.join(cmd)))
|
return stdoutdata if return_output else p.returncode
|
|
+ def _WarnIfLargeData(self):
|
+ """Outputs a warning message if a large amount of data is being used."""
|
+ if self.num_objects * self.thru_filesize > HumanReadableToBytes('2GiB'):
|
+ self.logger.info('This is a large operation, and could take a while.')
|
+
|
+ def _MakeTempFile(self, file_size=0, mem_metadata=False,
|
+ mem_data=False, prefix='gsutil_test_file'):
|
+ """Creates a temporary file of the given size and returns its path.
|
+
|
+ Args:
|
+ file_size: The size of the temporary file to create.
|
+ mem_metadata: If true, store md5 and file size in memory at
|
+ temp_file_dict[fpath].md5, tempfile_data[fpath].file_size.
|
+ mem_data: If true, store the file data in memory at
|
+ temp_file_dict[fpath].data
|
+ prefix: The prefix to use for the temporary file. Defaults to
|
+ gsutil_test_file.
|
+
|
+ Returns:
|
+ The file path of the created temporary file.
|
+ """
|
+ fd, fpath = tempfile.mkstemp(suffix='.bin', prefix=prefix,
|
+ dir=self.directory, text=False)
|
+ with os.fdopen(fd, 'wb') as fp:
|
+ random_bytes = os.urandom(min(file_size,
|
+ self.MAX_UNIQUE_RANDOM_BYTES))
|
+ total_bytes_written = 0
|
+ while total_bytes_written < file_size:
|
+ num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES,
|
+ file_size - total_bytes_written)
|
+ fp.write(random_bytes[:num_bytes])
|
+ total_bytes_written += num_bytes
|
+
|
+ if mem_metadata or mem_data:
|
+ with open(fpath, 'rb') as fp:
|
+ file_size = GetFileSize(fp) if mem_metadata else None
|
+ md5 = CalculateB64EncodedMd5FromContents(fp) if mem_metadata else None
|
+ data = fp.read() if mem_data else None
|
+ temp_file_dict[fpath] = FileDataTuple(file_size, md5, data)
|
+
|
+ self.temporary_files.add(fpath)
|
+ return fpath
|
+
|
def _SetUp(self):
|
"""Performs setup operations needed before diagnostics can be run."""
|
|
# Stores test result data.
|
self.results = {}
|
- # List of test files in a temporary location on disk for latency ops.
|
- self.latency_files = []
|
- # List of test objects to clean up in the test bucket.
|
- self.test_object_names = set()
|
- # Maps each test file path to its size in bytes.
|
- self.file_sizes = {}
|
- # Maps each test file to its contents as a string.
|
- self.file_contents = {}
|
- # Maps each test file to its MD5 hash.
|
- self.file_md5s = {}
|
+ # Set of file paths for local temporary files.
|
+ self.temporary_files = set()
|
+ # Set of names for test objects that exist in the test bucket.
|
+ self.temporary_objects = set()
|
# Total number of HTTP requests made.
|
self.total_requests = 0
|
# Total number of HTTP 5xx errors.
|
@@ -347,63 +536,86 @@ class PerfDiagCommand(Command):
|
self.error_responses_by_code = defaultdict(int)
|
# Total number of socket errors.
|
self.connection_breaks = 0
|
-
|
- def _MakeFile(file_size):
|
- """Creates a temporary file of the given size and returns its path."""
|
- fd, fpath = tempfile.mkstemp(suffix='.bin', prefix='gsutil_test_file',
|
- text=False)
|
- self.file_sizes[fpath] = file_size
|
- random_bytes = os.urandom(min(file_size, self.MAX_UNIQUE_RANDOM_BYTES))
|
- total_bytes = 0
|
- file_contents = ''
|
- while total_bytes < file_size:
|
- num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES, file_size - total_bytes)
|
- file_contents += random_bytes[:num_bytes]
|
- total_bytes += num_bytes
|
- self.file_contents[fpath] = file_contents
|
- with os.fdopen(fd, 'wb') as f:
|
- f.write(self.file_contents[fpath])
|
- with open(fpath, 'rb') as f:
|
- self.file_md5s[fpath] = CalculateB64EncodedMd5FromContents(f)
|
- return fpath
|
-
|
- # Create files for latency tests.
|
- for file_size in self.test_file_sizes:
|
- fpath = _MakeFile(file_size)
|
- self.latency_files.append(fpath)
|
-
|
- # Creating a file for warming up the TCP connection.
|
- self.tcp_warmup_file = _MakeFile(5 * 1024 * 1024) # 5 Mebibytes.
|
- # Remote file to use for TCP warmup.
|
- self.tcp_warmup_remote_file = (str(self.bucket_url) +
|
- os.path.basename(self.tcp_warmup_file))
|
-
|
- # Local file on disk for write throughput tests.
|
- self.thru_local_file = _MakeFile(self.thru_filesize)
|
+ # Boolean to prevent doing cleanup twice.
|
+ self.teardown_completed = False
|
+
|
+ # Create files for latency test.
|
+ if self.LAT in self.diag_tests:
|
+ self.latency_files = []
|
+ for file_size in self.test_lat_file_sizes:
|
+ fpath = self._MakeTempFile(file_size, mem_metadata=True, mem_data=True)
|
+ self.latency_files.append(fpath)
|
+
|
+ # Create files for throughput tests.
|
+ if self.diag_tests.intersection(
|
+ (self.RTHRU, self.WTHRU, self.RTHRU_FILE, self.WTHRU_FILE)):
|
+ # Create a file for warming up the TCP connection.
|
+ self.tcp_warmup_file = self._MakeTempFile(
|
+ 5 * 1024 * 1024, mem_metadata=True, mem_data=True)
|
+
|
+ # For in memory tests, throughput tests transfer the same object N times
|
+ # instead of creating N objects, in order to avoid excessive memory usage.
|
+ if self.diag_tests.intersection((self.RTHRU, self.WTHRU)):
|
+ self.mem_thru_file_name = self._MakeTempFile(
|
+ self.thru_filesize, mem_metadata=True, mem_data=True)
|
+ self.mem_thru_object_name = os.path.basename(self.mem_thru_file_name)
|
+
|
+ # For tests that use disk I/O, it is necessary to create N objects in
|
+ # in order to properly measure the performance impact of seeks.
|
+ if self.diag_tests.intersection((self.RTHRU_FILE, self.WTHRU_FILE)):
|
+ # List of file names and corresponding object names to use for file
|
+ # throughput tests.
|
+ self.thru_file_names = []
|
+ self.thru_object_names = []
|
+
|
+ free_disk_space = CheckFreeSpace(self.directory)
|
+ if free_disk_space >= self.thru_filesize * self.num_objects:
|
+ self.logger.info('\nCreating %d local files each of size %s.'
|
+ % (self.num_objects,
|
+ MakeHumanReadable(self.thru_filesize)))
|
+ self._WarnIfLargeData()
|
+ for _ in range(self.num_objects):
|
+ file_name = self._MakeTempFile(self.thru_filesize,
|
+ mem_metadata=True)
|
+ self.thru_file_names.append(file_name)
|
+ self.thru_object_names.append(os.path.basename(file_name))
|
+ else:
|
+ raise CommandException(
|
+ 'Not enough free disk space for throughput files: '
|
+ '%s of disk space required, but only %s available.'
|
+ % (MakeHumanReadable(self.thru_filesize * self.num_objects),
|
+ MakeHumanReadable(free_disk_space)))
|
|
# Dummy file buffer to use for downloading that goes nowhere.
|
self.discard_sink = DummyFile()
|
|
+ # Filter out misleading progress callback output and the incorrect
|
+ # suggestion to use gsutil -m perfdiag.
|
+ self.logger.addFilter(self._PerfdiagFilter())
|
+
|
def _TearDown(self):
|
"""Performs operations to clean things up after performing diagnostics."""
|
- for fpath in self.latency_files + [self.thru_local_file,
|
- self.tcp_warmup_file]:
|
+ if not self.teardown_completed:
|
+ temp_file_dict.clear()
|
+
|
try:
|
- os.remove(fpath)
|
+ for fpath in self.temporary_files:
|
+ os.remove(fpath)
|
+ if self.delete_directory:
|
+ os.rmdir(self.directory)
|
except OSError:
|
pass
|
|
- for object_name in self.test_object_names:
|
-
|
- def _Delete():
|
- try:
|
- self.gsutil_api.DeleteObject(self.bucket_url.bucket_name,
|
- object_name,
|
- provider=self.provider)
|
- except NotFoundException:
|
- pass
|
-
|
- self._RunOperation(_Delete)
|
+ if self.threads > 1 or self.processes > 1:
|
+ args = [obj for obj in self.temporary_objects]
|
+ self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
|
+ arg_checker=DummyArgChecker,
|
+ parallel_operations_override=True,
|
+ process_count=self.processes, thread_count=self.threads)
|
+ else:
|
+ for object_name in self.temporary_objects:
|
+ self.Delete(object_name)
|
+ self.teardown_completed = True
|
|
@contextlib.contextmanager
|
def _Time(self, key, bucket):
|
@@ -478,12 +690,13 @@ class PerfDiagCommand(Command):
|
# Stores timing information for each category of operation.
|
self.results['latency'] = defaultdict(list)
|
|
- for i in range(self.num_iterations):
|
+ for i in range(self.num_objects):
|
self.logger.info('\nRunning latency iteration %d...', i+1)
|
for fpath in self.latency_files:
|
+ file_data = temp_file_dict[fpath]
|
url = self.bucket_url.Clone()
|
url.object_name = os.path.basename(fpath)
|
- file_size = self.file_sizes[fpath]
|
+ file_size = file_data.size
|
readable_file_size = MakeHumanReadable(file_size)
|
|
self.logger.info(
|
@@ -493,7 +706,7 @@ class PerfDiagCommand(Command):
|
upload_target = StorageUrlToUploadObjectMetadata(url)
|
|
def _Upload():
|
- io_fp = cStringIO.StringIO(self.file_contents[fpath])
|
+ io_fp = cStringIO.StringIO(file_data.data)
|
with self._Time('UPLOAD_%d' % file_size, self.results['latency']):
|
self.gsutil_api.UploadObject(
|
io_fp, upload_target, size=file_size, provider=self.provider,
|
@@ -508,8 +721,7 @@ class PerfDiagCommand(Command):
|
'mediaLink', 'size'])
|
# Download will get the metadata first if we don't pass it in.
|
download_metadata = self._RunOperation(_Metadata)
|
- serialization_dict = GetDownloadSerializationDict(download_metadata)
|
- serialization_data = json.dumps(serialization_dict)
|
+ serialization_data = GetDownloadSerializationData(download_metadata)
|
|
def _Download():
|
with self._Time('DOWNLOAD_%d' % file_size, self.results['latency']):
|
@@ -524,213 +736,308 @@ class PerfDiagCommand(Command):
|
provider=self.provider)
|
self._RunOperation(_Delete)
|
|
- class _CpFilter(logging.Filter):
|
+ class _PerfdiagFilter(logging.Filter):
|
|
def filter(self, record):
|
- # Used to prevent cp._LogCopyOperation from spewing output from
|
- # subprocesses about every iteration.
|
+ # Used to prevent unnecessary output when using multiprocessing.
|
msg = record.getMessage()
|
return not (('Copying file:///' in msg) or ('Copying gs://' in msg) or
|
- ('Computing CRC' in msg))
|
+ ('Computing CRC' in msg) or ('gsutil -m perfdiag' in msg))
|
|
def _PerfdiagExceptionHandler(self, e):
|
"""Simple exception handler to allow post-completion status."""
|
self.logger.error(str(e))
|
|
- def _RunReadThruTests(self):
|
- """Runs read throughput tests."""
|
- self.logger.info(
|
- '\nRunning read throughput tests (%s iterations of size %s)' %
|
- (self.num_iterations, MakeHumanReadable(self.thru_filesize)))
|
-
|
- self.results['read_throughput'] = {'file_size': self.thru_filesize,
|
- 'num_times': self.num_iterations,
|
- 'processes': self.processes,
|
- 'threads': self.threads}
|
-
|
- # Copy the TCP warmup file.
|
- warmup_url = self.bucket_url.Clone()
|
- warmup_url.object_name = os.path.basename(self.tcp_warmup_file)
|
- warmup_target = StorageUrlToUploadObjectMetadata(warmup_url)
|
- self.test_object_names.add(warmup_url.object_name)
|
-
|
- def _Upload1():
|
- self.gsutil_api.UploadObject(
|
- cStringIO.StringIO(self.file_contents[self.tcp_warmup_file]),
|
- warmup_target, provider=self.provider, fields=['name'])
|
- self._RunOperation(_Upload1)
|
-
|
- # Copy the file to remote location before reading.
|
- thru_url = self.bucket_url.Clone()
|
- thru_url.object_name = os.path.basename(self.thru_local_file)
|
- thru_target = StorageUrlToUploadObjectMetadata(thru_url)
|
- thru_target.md5Hash = self.file_md5s[self.thru_local_file]
|
- self.test_object_names.add(thru_url.object_name)
|
-
|
- # Get the mediaLink here so that we can pass it to download.
|
- def _Upload2():
|
- return self.gsutil_api.UploadObject(
|
- cStringIO.StringIO(self.file_contents[self.thru_local_file]),
|
- thru_target, provider=self.provider, size=self.thru_filesize,
|
- fields=['name', 'mediaLink', 'size'])
|
-
|
- # Get the metadata for the object so that we are just measuring performance
|
- # on the actual bytes transfer.
|
- download_metadata = self._RunOperation(_Upload2)
|
- serialization_dict = GetDownloadSerializationDict(download_metadata)
|
- serialization_data = json.dumps(serialization_dict)
|
+ def PerformFannedDownload(self, need_to_slice, object_names, file_names,
|
+ serialization_data):
|
+ """Performs a parallel download of multiple objects using the fan strategy.
|
|
- if self.processes == 1 and self.threads == 1:
|
+ Args:
|
+ need_to_slice: If True, additionally apply the slice strategy to each
|
+ object in object_names.
|
+ object_names: A list of object names to be downloaded. Each object must
|
+ already exist in the test bucket.
|
+ file_names: A list, corresponding by index to object_names, of file names
|
+ for downloaded data. If None, discard downloaded data.
|
+ serialization_data: A list, corresponding by index to object_names,
|
+ of serialization data for each object.
|
+ """
|
+ args = []
|
+ for i in range(len(object_names)):
|
+ file_name = file_names[i] if file_names else None
|
+ args.append(FanDownloadTuple(
|
+ need_to_slice, object_names[i], file_name,
|
+ serialization_data[i]))
|
+ self.Apply(_DownloadObject, args, _PerfdiagExceptionHandler,
|
+ ('total_requests', 'request_errors'),
|
+ arg_checker=DummyArgChecker, parallel_operations_override=True,
|
+ process_count=self.processes, thread_count=self.threads)
|
+
|
+ def PerformSlicedDownload(self, object_name, file_name, serialization_data):
|
+ """Performs a download of an object using the slice strategy.
|
|
- # Warm up the TCP connection.
|
- def _Warmup():
|
- self.gsutil_api.GetObjectMedia(warmup_url.bucket_name,
|
- warmup_url.object_name,
|
- self.discard_sink,
|
- provider=self.provider)
|
- self._RunOperation(_Warmup)
|
+ Args:
|
+ object_name: The name of the object to download.
|
+ file_name: The name of the file to download data to, or None if data
|
+ should be discarded.
|
+ serialization_data: The serialization data for the object.
|
+ """
|
+ if file_name:
|
+ with open(file_name, 'ab') as fp:
|
+ fp.truncate(self.thru_filesize)
|
+ component_size = DivideAndCeil(self.thru_filesize, self.num_slices)
|
+ args = []
|
+ for i in range(self.num_slices):
|
+ start_byte = i * component_size
|
+ end_byte = min((i + 1) * (component_size) - 1, self.thru_filesize - 1)
|
+ args.append(SliceDownloadTuple(object_name, file_name, serialization_data,
|
+ start_byte, end_byte))
|
+ self.Apply(_DownloadSlice, args, _PerfdiagExceptionHandler,
|
+ ('total_requests', 'request_errors'),
|
+ arg_checker=DummyArgChecker, parallel_operations_override=True,
|
+ process_count=self.processes, thread_count=self.threads)
|
+
|
+ def PerformFannedUpload(self, need_to_slice, file_names, object_names,
|
+ use_file):
|
+ """Performs a parallel upload of multiple files using the fan strategy.
|
+
|
+ The metadata for file_name should be present in temp_file_dict prior
|
+ to calling. Also, the data for file_name should be present in temp_file_dict
|
+ if use_file is specified as False.
|
|
- times = []
|
+ Args:
|
+ need_to_slice: If True, additionally apply the slice strategy to each
|
+ file in file_names.
|
+ file_names: A list of file names to be uploaded.
|
+ object_names: A list, corresponding by by index to file_names, of object
|
+ names to upload data to.
|
+ use_file: If true, use disk I/O, otherwise read upload data from memory.
|
+ """
|
+ args = []
|
+ for i in range(len(file_names)):
|
+ args.append(FanUploadTuple(
|
+ need_to_slice, file_names[i], object_names[i], use_file))
|
+ self.Apply(_UploadObject, args, _PerfdiagExceptionHandler,
|
+ ('total_requests', 'request_errors'),
|
+ arg_checker=DummyArgChecker, parallel_operations_override=True,
|
+ process_count=self.processes, thread_count=self.threads)
|
+
|
+ def PerformSlicedUpload(self, file_name, object_name, use_file):
|
+ """Performs a parallel upload of a file using the slice strategy.
|
+
|
+ The metadata for file_name should be present in temp_file_dict prior
|
+ to calling. Also, the data from for file_name should be present in
|
+ temp_file_dict if use_file is specified as False.
|
|
- def _Download():
|
- t0 = time.time()
|
- self.gsutil_api.GetObjectMedia(
|
- thru_url.bucket_name, thru_url.object_name, self.discard_sink,
|
- provider=self.provider, serialization_data=serialization_data)
|
- t1 = time.time()
|
- times.append(t1 - t0)
|
- for _ in range(self.num_iterations):
|
- self._RunOperation(_Download)
|
- time_took = sum(times)
|
+ Args:
|
+ file_name: The name of the file to upload.
|
+ object_name: The name of the object to upload to.
|
+ use_file: If true, use disk I/O, otherwise read upload data from memory.
|
+ """
|
+ # Divide the file into components.
|
+ component_size = DivideAndCeil(self.thru_filesize, self.num_slices)
|
+ component_object_names = (
|
+ [object_name + str(i) for i in range(self.num_slices)])
|
+
|
+ args = []
|
+ for i in range(self.num_slices):
|
+ component_start = i * component_size
|
+ component_size = min(component_size,
|
+ temp_file_dict[file_name].size - component_start)
|
+ args.append(SliceUploadTuple(file_name, component_object_names[i],
|
+ use_file, component_start, component_size))
|
+
|
+ # Upload the components in parallel.
|
+ try:
|
+ self.Apply(_UploadSlice, args, _PerfdiagExceptionHandler,
|
+ ('total_requests', 'request_errors'),
|
+ arg_checker=DummyArgChecker, parallel_operations_override=True,
|
+ process_count=self.processes, thread_count=self.threads)
|
+
|
+ # Compose the components into an object.
|
+ request_components = []
|
+ for i in range(self.num_slices):
|
+ src_obj_metadata = (
|
+ apitools_messages.ComposeRequest.SourceObjectsValueListEntry(
|
+ name=component_object_names[i]))
|
+ request_components.append(src_obj_metadata)
|
+
|
+ dst_obj_metadata = apitools_messages.Object()
|
+ dst_obj_metadata.name = object_name
|
+ dst_obj_metadata.bucket = self.bucket_url.bucket_name
|
+ def _Compose():
|
+ self.gsutil_api.ComposeObject(request_components, dst_obj_metadata,
|
+ provider=self.provider)
|
+ self._RunOperation(_Compose)
|
+ finally:
|
+ # Delete the temporary components.
|
+ self.Apply(_DeleteWrapper, component_object_names,
|
+ _PerfdiagExceptionHandler,
|
+ ('total_requests', 'request_errors'),
|
+ arg_checker=DummyArgChecker, parallel_operations_override=True,
|
+ process_count=self.processes, thread_count=self.threads)
|
+
|
+ def _RunReadThruTests(self, use_file=False):
|
+ """Runs read throughput tests."""
|
+ test_name = 'read_throughput_file' if use_file else 'read_throughput'
|
+ file_io_string = 'with file I/O' if use_file else ''
|
+ self.logger.info(
|
+ '\nRunning read throughput tests %s (%s objects of size %s)' %
|
+ (file_io_string, self.num_objects,
|
+ MakeHumanReadable(self.thru_filesize)))
|
+ self._WarnIfLargeData()
|
+
|
+ self.results[test_name] = {'file_size': self.thru_filesize,
|
+ 'processes': self.processes,
|
+ 'threads': self.threads,
|
+ 'parallelism': self.parallel_strategy
|
+ }
|
+
|
+ # Copy the file(s) to the test bucket, and also get the serialization data
|
+ # so that we can pass it to download.
|
+ if use_file:
|
+ # For test with file I/O use N files on disk to preserve seek performance.
|
+ file_names = self.thru_file_names
|
+ object_names = self.thru_object_names
|
+ serialization_data = []
|
+ for i in range(self.num_objects):
|
+ self.temporary_objects.add(self.thru_object_names[i])
|
+ if self.WTHRU_FILE in self.diag_tests:
|
+ # If we ran the WTHRU_FILE test, then the objects already exist.
|
+ obj_metadata = self.gsutil_api.GetObjectMetadata(
|
+ self.bucket_url.bucket_name, self.thru_object_names[i],
|
+ fields=['size', 'mediaLink'], provider=self.bucket_url.scheme)
|
+ else:
|
+ obj_metadata = self.Upload(self.thru_file_names[i],
|
+ self.thru_object_names[i], use_file)
|
+
|
+ # File overwrite causes performance issues with sliced downloads.
|
+ # Delete the file and reopen it for download. This matches what a real
|
+ # download would look like.
|
+ os.unlink(self.thru_file_names[i])
|
+ open(self.thru_file_names[i], 'ab').close()
|
+ serialization_data.append(GetDownloadSerializationData(obj_metadata))
|
else:
|
- args = ([(thru_url.bucket_name, thru_url.object_name, serialization_data)]
|
- * self.num_iterations)
|
- self.logger.addFilter(self._CpFilter())
|
+ # For in-memory test only use one file but copy it num_objects times, to
|
+ # allow scalability in num_objects.
|
+ self.temporary_objects.add(self.mem_thru_object_name)
|
+ obj_metadata = self.Upload(self.mem_thru_file_name,
|
+ self.mem_thru_object_name, use_file)
|
+ file_names = None
|
+ object_names = [self.mem_thru_object_name] * self.num_objects
|
+ serialization_data = (
|
+ [GetDownloadSerializationData(obj_metadata)] * self.num_objects)
|
+
|
+ # Warmup the TCP connection.
|
+ warmup_obj_name = os.path.basename(self.tcp_warmup_file)
|
+ self.temporary_objects.add(warmup_obj_name)
|
+ self.Upload(self.tcp_warmup_file, warmup_obj_name)
|
+ self.Download(warmup_obj_name)
|
|
- t0 = time.time()
|
- self.Apply(_DownloadWrapper,
|
- args,
|
- _PerfdiagExceptionHandler,
|
- arg_checker=DummyArgChecker,
|
- parallel_operations_override=True,
|
- process_count=self.processes,
|
- thread_count=self.threads)
|
- t1 = time.time()
|
- time_took = t1 - t0
|
+ t0 = time.time()
|
+ if self.processes == 1 and self.threads == 1:
|
+ for i in range(self.num_objects):
|
+ file_name = file_names[i] if use_file else None
|
+ self.Download(object_names[i], file_name, serialization_data[i])
|
+ else:
|
+ if self.parallel_strategy in (self.FAN, self.BOTH):
|
+ need_to_slice = (self.parallel_strategy == self.BOTH)
|
+ self.PerformFannedDownload(need_to_slice, object_names, file_names,
|
+ serialization_data)
|
+ elif self.parallel_strategy == self.SLICE:
|
+ for i in range(self.num_objects):
|
+ file_name = file_names[i] if use_file else None
|
+ self.PerformSlicedDownload(
|
+ object_names[i], file_name, serialization_data[i])
|
+ t1 = time.time()
|
|
- total_bytes_copied = self.thru_filesize * self.num_iterations
|
+ time_took = t1 - t0
|
+ total_bytes_copied = self.thru_filesize * self.num_objects
|
bytes_per_second = total_bytes_copied / time_took
|
|
- self.results['read_throughput']['time_took'] = time_took
|
- self.results['read_throughput']['total_bytes_copied'] = total_bytes_copied
|
- self.results['read_throughput']['bytes_per_second'] = bytes_per_second
|
+ self.results[test_name]['time_took'] = time_took
|
+ self.results[test_name]['total_bytes_copied'] = total_bytes_copied
|
+ self.results[test_name]['bytes_per_second'] = bytes_per_second
|
|
- def _RunWriteThruTests(self):
|
+ def _RunWriteThruTests(self, use_file=False):
|
"""Runs write throughput tests."""
|
+ test_name = 'write_throughput_file' if use_file else 'write_throughput'
|
+ file_io_string = 'with file I/O' if use_file else ''
|
self.logger.info(
|
- '\nRunning write throughput tests (%s iterations of size %s)' %
|
- (self.num_iterations, MakeHumanReadable(self.thru_filesize)))
|
-
|
- self.results['write_throughput'] = {'file_size': self.thru_filesize,
|
- 'num_copies': self.num_iterations,
|
- 'processes': self.processes,
|
- 'threads': self.threads}
|
-
|
- warmup_url = self.bucket_url.Clone()
|
- warmup_url.object_name = os.path.basename(self.tcp_warmup_file)
|
- warmup_target = StorageUrlToUploadObjectMetadata(warmup_url)
|
- self.test_object_names.add(warmup_url.object_name)
|
-
|
- thru_url = self.bucket_url.Clone()
|
- thru_url.object_name = os.path.basename(self.thru_local_file)
|
- thru_target = StorageUrlToUploadObjectMetadata(thru_url)
|
- thru_tuples = []
|
- for i in xrange(self.num_iterations):
|
- # Create a unique name for each uploaded object. Otherwise,
|
- # the XML API would fail when trying to non-atomically get metadata
|
- # for the object that gets blown away by the overwrite.
|
- remote_object_name = thru_target.name + str(i)
|
- self.test_object_names.add(remote_object_name)
|
- thru_tuples.append(UploadObjectTuple(thru_target.bucket,
|
- remote_object_name,
|
- filepath=self.thru_local_file))
|
-
|
- if self.processes == 1 and self.threads == 1:
|
- # Warm up the TCP connection.
|
- def _Warmup():
|
- self.gsutil_api.UploadObject(
|
- cStringIO.StringIO(self.file_contents[self.tcp_warmup_file]),
|
- warmup_target, provider=self.provider, size=self.thru_filesize,
|
- fields=['name'])
|
- self._RunOperation(_Warmup)
|
-
|
- times = []
|
-
|
- for i in xrange(self.num_iterations):
|
- thru_tuple = thru_tuples[i]
|
- def _Upload():
|
- """Uploads the write throughput measurement object."""
|
- upload_target = apitools_messages.Object(
|
- bucket=thru_tuple.bucket_name, name=thru_tuple.object_name,
|
- md5Hash=thru_tuple.md5)
|
- io_fp = cStringIO.StringIO(self.file_contents[self.thru_local_file])
|
- t0 = time.time()
|
- if self.thru_filesize < ResumableThreshold():
|
- self.gsutil_api.UploadObject(
|
- io_fp, upload_target, provider=self.provider,
|
- size=self.thru_filesize, fields=['name'])
|
- else:
|
- self.gsutil_api.UploadObjectResumable(
|
- io_fp, upload_target, provider=self.provider,
|
- size=self.thru_filesize, fields=['name'],
|
- tracker_callback=_DummyTrackerCallback)
|
-
|
- t1 = time.time()
|
- times.append(t1 - t0)
|
+ '\nRunning write throughput tests %s (%s objects of size %s)' %
|
+ (file_io_string, self.num_objects,
|
+ MakeHumanReadable(self.thru_filesize)))
|
+ self._WarnIfLargeData()
|
+
|
+ self.results[test_name] = {'file_size': self.thru_filesize,
|
+ 'processes': self.processes,
|
+ 'threads': self.threads,
|
+ 'parallelism': self.parallel_strategy}
|
+
|
+ # Warmup the TCP connection.
|
+ warmup_obj_name = os.path.basename(self.tcp_warmup_file)
|
+ self.temporary_objects.add(warmup_obj_name)
|
+ self.Upload(self.tcp_warmup_file, warmup_obj_name)
|
+
|
+ if use_file:
|
+ # For test with file I/O use N files on disk to preserve seek performance.
|
+ file_names = self.thru_file_names
|
+ object_names = self.thru_object_names
|
+ else:
|
+ # For in-memory test only use one file but copy it num_objects times, to
|
+ # allow for scalability in num_objects.
|
+ file_names = [self.mem_thru_file_name] * self.num_objects
|
+ object_names = (
|
+ [self.mem_thru_object_name + str(i) for i in range(self.num_objects)])
|
|
- self._RunOperation(_Upload)
|
- time_took = sum(times)
|
+ for object_name in object_names:
|
+ self.temporary_objects.add(object_name)
|
|
+ t0 = time.time()
|
+ if self.processes == 1 and self.threads == 1:
|
+ for i in range(self.num_objects):
|
+ self.Upload(file_names[i], object_names[i], use_file)
|
else:
|
- args = thru_tuples
|
- t0 = time.time()
|
- self.Apply(_UploadWrapper,
|
- args,
|
- _PerfdiagExceptionHandler,
|
- arg_checker=DummyArgChecker,
|
- parallel_operations_override=True,
|
- process_count=self.processes,
|
- thread_count=self.threads)
|
- t1 = time.time()
|
- time_took = t1 - t0
|
+ if self.parallel_strategy in (self.FAN, self.BOTH):
|
+ need_to_slice = (self.parallel_strategy == self.BOTH)
|
+ self.PerformFannedUpload(need_to_slice, file_names, object_names,
|
+ use_file)
|
+ elif self.parallel_strategy == self.SLICE:
|
+ for i in range(self.num_objects):
|
+ self.PerformSlicedUpload(file_names[i], object_names[i], use_file)
|
+ t1 = time.time()
|
|
- total_bytes_copied = self.thru_filesize * self.num_iterations
|
+ time_took = t1 - t0
|
+ total_bytes_copied = self.thru_filesize * self.num_objects
|
bytes_per_second = total_bytes_copied / time_took
|
|
- self.results['write_throughput']['time_took'] = time_took
|
- self.results['write_throughput']['total_bytes_copied'] = total_bytes_copied
|
- self.results['write_throughput']['bytes_per_second'] = bytes_per_second
|
+ self.results[test_name]['time_took'] = time_took
|
+ self.results[test_name]['total_bytes_copied'] = total_bytes_copied
|
+ self.results[test_name]['bytes_per_second'] = bytes_per_second
|
|
def _RunListTests(self):
|
"""Runs eventual consistency listing latency tests."""
|
- self.results['listing'] = {'num_files': self.num_iterations}
|
+ self.results['listing'] = {'num_files': self.num_objects}
|
|
- # Generate N random object names to put in the bucket.
|
+ # Generate N random objects to put into the bucket.
|
list_prefix = 'gsutil-perfdiag-list-'
|
+ list_fpaths = []
|
list_objects = []
|
- for _ in xrange(self.num_iterations):
|
- list_object_name = u'%s%s' % (list_prefix, os.urandom(20).encode('hex'))
|
- self.test_object_names.add(list_object_name)
|
- list_objects.append(list_object_name)
|
+ args = []
|
+ for _ in xrange(self.num_objects):
|
+ fpath = self._MakeTempFile(0, mem_data=True, mem_metadata=True,
|
+ prefix=list_prefix)
|
+ list_fpaths.append(fpath)
|
+ object_name = os.path.basename(fpath)
|
+ list_objects.append(object_name)
|
+ args.append(FanUploadTuple(False, fpath, object_name, False))
|
+ self.temporary_objects.add(object_name)
|
|
# Add the objects to the bucket.
|
self.logger.info(
|
- '\nWriting %s objects for listing test...', self.num_iterations)
|
- empty_md5 = CalculateB64EncodedMd5FromContents(cStringIO.StringIO(''))
|
- args = [
|
- UploadObjectTuple(self.bucket_url.bucket_name, name, md5=empty_md5,
|
- contents='') for name in list_objects]
|
- self.Apply(_UploadWrapper, args, _PerfdiagExceptionHandler,
|
+ '\nWriting %s objects for listing test...', self.num_objects)
|
+
|
+ self.Apply(_UploadObject, args, _PerfdiagExceptionHandler,
|
arg_checker=DummyArgChecker)
|
|
list_latencies = []
|
@@ -743,7 +1050,7 @@ class PerfDiagCommand(Command):
|
"""Lists and returns objects in the bucket. Also records latency."""
|
t0 = time.time()
|
objects = list(self.gsutil_api.ListObjects(
|
- self.bucket_url.bucket_name, prefix=list_prefix, delimiter='/',
|
+ self.bucket_url.bucket_name, delimiter='/',
|
provider=self.provider, fields=['items/name']))
|
t1 = time.time()
|
list_latencies.append(t1 - t0)
|
@@ -751,7 +1058,7 @@ class PerfDiagCommand(Command):
|
|
self.logger.info(
|
'Listing bucket %s waiting for %s objects to appear...',
|
- self.bucket_url.bucket_name, self.num_iterations)
|
+ self.bucket_url.bucket_name, self.num_objects)
|
while expected_objects - found_objects:
|
def _ListAfterUpload():
|
names = _List()
|
@@ -771,14 +1078,15 @@ class PerfDiagCommand(Command):
|
'time_took': total_end_time - total_start_time,
|
}
|
|
+ args = [object_name for object_name in list_objects]
|
self.logger.info(
|
- 'Deleting %s objects for listing test...', self.num_iterations)
|
+ 'Deleting %s objects for listing test...', self.num_objects)
|
self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
|
arg_checker=DummyArgChecker)
|
|
self.logger.info(
|
'Listing bucket %s waiting for %s objects to disappear...',
|
- self.bucket_url.bucket_name, self.num_iterations)
|
+ self.bucket_url.bucket_name, self.num_objects)
|
list_latencies = []
|
files_seen = []
|
total_start_time = time.time()
|
@@ -802,45 +1110,102 @@ class PerfDiagCommand(Command):
|
'time_took': total_end_time - total_start_time,
|
}
|
|
- def Upload(self, thru_tuple, thread_state=None):
|
- gsutil_api = GetCloudApiInstance(self, thread_state)
|
-
|
- md5hash = thru_tuple.md5
|
- contents = thru_tuple.contents
|
- if thru_tuple.filepath:
|
- md5hash = self.file_md5s[thru_tuple.filepath]
|
- contents = self.file_contents[thru_tuple.filepath]
|
-
|
- upload_target = apitools_messages.Object(
|
- bucket=thru_tuple.bucket_name, name=thru_tuple.object_name,
|
- md5Hash=md5hash)
|
- file_size = len(contents)
|
- if file_size < ResumableThreshold():
|
- gsutil_api.UploadObject(
|
- cStringIO.StringIO(contents), upload_target,
|
- provider=self.provider, size=file_size, fields=['name'])
|
- else:
|
- gsutil_api.UploadObjectResumable(
|
- cStringIO.StringIO(contents), upload_target,
|
- provider=self.provider, size=file_size, fields=['name'],
|
- tracker_callback=_DummyTrackerCallback)
|
+ def Upload(self, file_name, object_name, use_file=False, file_start=0,
|
+ file_size=None):
|
+ """Performs an upload to the test bucket.
|
|
- def Download(self, download_tuple, thread_state=None):
|
- """Downloads a file.
|
+ The file is uploaded to the bucket referred to by self.bucket_url, and has
|
+ name object_name.
|
|
Args:
|
- download_tuple: (bucket name, object name, serialization data for object).
|
- thread_state: gsutil Cloud API instance to use for the download.
|
+ file_name: The path to the local file, and the key to its entry in
|
+ temp_file_dict.
|
+ object_name: The name of the remote object.
|
+ use_file: If true, use disk I/O, otherwise read everything from memory.
|
+ file_start: The first byte in the file to upload to the object.
|
+ (only should be specified for sliced uploads)
|
+ file_size: The size of the file to upload.
|
+ (only should be specified for sliced uploads)
|
+
|
+ Returns:
|
+ Uploaded Object Metadata.
|
"""
|
- gsutil_api = GetCloudApiInstance(self, thread_state)
|
- gsutil_api.GetObjectMedia(
|
- download_tuple[0], download_tuple[1], self.discard_sink,
|
- provider=self.provider, serialization_data=download_tuple[2])
|
+ fp = None
|
+ if file_size is None:
|
+ file_size = temp_file_dict[file_name].size
|
|
- def Delete(self, thru_tuple, thread_state=None):
|
- gsutil_api = thread_state or self.gsutil_api
|
- gsutil_api.DeleteObject(
|
- thru_tuple.bucket_name, thru_tuple.object_name, provider=self.provider)
|
+ upload_url = self.bucket_url.Clone()
|
+ upload_url.object_name = object_name
|
+ upload_target = StorageUrlToUploadObjectMetadata(upload_url)
|
+
|
+ try:
|
+ if use_file:
|
+ fp = FilePart(file_name, file_start, file_size)
|
+ else:
|
+ data = temp_file_dict[file_name].data[file_start:file_start+file_size]
|
+ fp = cStringIO.StringIO(data)
|
+
|
+ def _InnerUpload():
|
+ if file_size < ResumableThreshold():
|
+ return self.gsutil_api.UploadObject(
|
+ fp, upload_target, provider=self.provider, size=file_size,
|
+ fields=['name', 'mediaLink', 'size'])
|
+ else:
|
+ return self.gsutil_api.UploadObjectResumable(
|
+ fp, upload_target, provider=self.provider, size=file_size,
|
+ fields=['name', 'mediaLink', 'size'],
|
+ tracker_callback=_DummyTrackerCallback)
|
+ return self._RunOperation(_InnerUpload)
|
+ finally:
|
+ if fp:
|
+ fp.close()
|
+
|
+ def Download(self, object_name, file_name=None, serialization_data=None,
|
+ start_byte=0, end_byte=None):
|
+ """Downloads an object from the test bucket.
|
+
|
+ Args:
|
+ object_name: The name of the object (in the test bucket) to download.
|
+ file_name: Optional file name to write downloaded data to. If None,
|
+ downloaded data is discarded immediately.
|
+ serialization_data: Optional serialization data, used so that we don't
|
+ have to get the metadata before downloading.
|
+ start_byte: The first byte in the object to download.
|
+ (only should be specified for sliced downloads)
|
+ end_byte: The last byte in the object to download.
|
+ (only should be specified for sliced downloads)
|
+ """
|
+ fp = None
|
+ try:
|
+ if file_name is not None:
|
+ fp = open(file_name, 'r+b')
|
+ fp.seek(start_byte)
|
+ else:
|
+ fp = self.discard_sink
|
+
|
+ def _InnerDownload():
|
+ self.gsutil_api.GetObjectMedia(
|
+ self.bucket_url.bucket_name, object_name, fp,
|
+ provider=self.provider, start_byte=start_byte, end_byte=end_byte,
|
+ serialization_data=serialization_data)
|
+ self._RunOperation(_InnerDownload)
|
+ finally:
|
+ if fp:
|
+ fp.close()
|
+
|
+ def Delete(self, object_name):
|
+ """Deletes an object from the test bucket.
|
+
|
+ Args:
|
+ object_name: The name of the object to delete.
|
+ """
|
+ try:
|
+ def _InnerDelete():
|
+ self.gsutil_api.DeleteObject(self.bucket_url.bucket_name, object_name,
|
+ provider=self.provider)
|
+ self._RunOperation(_InnerDelete)
|
+ except NotFoundException:
|
+ pass
|
|
def _GetDiskCounters(self):
|
"""Retrieves disk I/O statistics for all disks.
|
@@ -966,7 +1331,7 @@ class PerfDiagCommand(Command):
|
sysinfo['ip_address'] = ''
|
# Record the temporary directory used since it can affect performance, e.g.
|
# when on a networked filesystem.
|
- sysinfo['tempdir'] = tempfile.gettempdir()
|
+ sysinfo['tempdir'] = self.directory
|
|
# Produces an RFC 2822 compliant GMT timestamp.
|
sysinfo['gmt_timestamp'] = time.strftime('%a, %d %b %Y %H:%M:%S +0000',
|
@@ -1178,12 +1543,27 @@ class PerfDiagCommand(Command):
|
print 'Write Throughput'.center(78)
|
print '-' * 78
|
write_thru = self.results['write_throughput']
|
- print 'Copied a %s file %d times for a total transfer size of %s.' % (
|
+ print 'Copied %s %s file(s) for a total transfer size of %s.' % (
|
+ self.num_objects,
|
MakeHumanReadable(write_thru['file_size']),
|
- write_thru['num_copies'],
|
MakeHumanReadable(write_thru['total_bytes_copied']))
|
print 'Write throughput: %s/s.' % (
|
MakeBitsHumanReadable(write_thru['bytes_per_second'] * 8))
|
+ print 'Parallelism strategy: %s' % write_thru['parallelism']
|
+
|
+ if 'write_throughput_file' in self.results:
|
+ print
|
+ print '-' * 78
|
+ print 'Write Throughput With File I/O'.center(78)
|
+ print '-' * 78
|
+ write_thru_file = self.results['write_throughput_file']
|
+ print 'Copied %s %s file(s) for a total transfer size of %s.' % (
|
+ self.num_objects,
|
+ MakeHumanReadable(write_thru_file['file_size']),
|
+ MakeHumanReadable(write_thru_file['total_bytes_copied']))
|
+ print 'Write throughput: %s/s.' % (
|
+ MakeBitsHumanReadable(write_thru_file['bytes_per_second'] * 8))
|
+ print 'Parallelism strategy: %s' % write_thru_file['parallelism']
|
|
if 'read_throughput' in self.results:
|
print
|
@@ -1191,12 +1571,27 @@ class PerfDiagCommand(Command):
|
print 'Read Throughput'.center(78)
|
print '-' * 78
|
read_thru = self.results['read_throughput']
|
- print 'Copied a %s file %d times for a total transfer size of %s.' % (
|
+ print 'Copied %s %s file(s) for a total transfer size of %s.' % (
|
+ self.num_objects,
|
MakeHumanReadable(read_thru['file_size']),
|
- read_thru['num_times'],
|
MakeHumanReadable(read_thru['total_bytes_copied']))
|
print 'Read throughput: %s/s.' % (
|
MakeBitsHumanReadable(read_thru['bytes_per_second'] * 8))
|
+ print 'Parallelism strategy: %s' % read_thru['parallelism']
|
+
|
+ if 'read_throughput_file' in self.results:
|
+ print
|
+ print '-' * 78
|
+ print 'Read Throughput With File I/O'.center(78)
|
+ print '-' * 78
|
+ read_thru_file = self.results['read_throughput_file']
|
+ print 'Copied %s %s file(s) for a total transfer size of %s.' % (
|
+ self.num_objects,
|
+ MakeHumanReadable(read_thru_file['file_size']),
|
+ MakeHumanReadable(read_thru_file['total_bytes_copied']))
|
+ print 'Read throughput: %s/s.' % (
|
+ MakeBitsHumanReadable(read_thru_file['bytes_per_second'] * 8))
|
+ print 'Parallelism strategy: %s' % read_thru_file['parallelism']
|
|
if 'listing' in self.results:
|
print
|
@@ -1389,15 +1784,23 @@ class PerfDiagCommand(Command):
|
def _ParseArgs(self):
|
"""Parses arguments for perfdiag command."""
|
# From -n.
|
- self.num_iterations = 5
|
+ self.num_objects = 5
|
# From -c.
|
self.processes = 1
|
# From -k.
|
self.threads = 1
|
+ # From -p
|
+ self.parallel_strategy = None
|
+ # From -y
|
+ self.num_slices = 4
|
# From -s.
|
self.thru_filesize = 1048576
|
+ # From -d.
|
+ self.directory = tempfile.gettempdir()
|
+ # Keep track of whether or not to delete the directory upon completion.
|
+ self.delete_directory = False
|
# From -t.
|
- self.diag_tests = self.DEFAULT_DIAG_TESTS
|
+ self.diag_tests = set(self.DEFAULT_DIAG_TESTS)
|
# From -o.
|
self.output_file = None
|
# From -i.
|
@@ -1408,7 +1811,7 @@ class PerfDiagCommand(Command):
|
if self.sub_opts:
|
for o, a in self.sub_opts:
|
if o == '-n':
|
- self.num_iterations = self._ParsePositiveInteger(
|
+ self.num_objects = self._ParsePositiveInteger(
|
a, 'The -n parameter must be a positive integer.')
|
if o == '-c':
|
self.processes = self._ParsePositiveInteger(
|
@@ -1416,21 +1819,32 @@ class PerfDiagCommand(Command):
|
if o == '-k':
|
self.threads = self._ParsePositiveInteger(
|
a, 'The -k parameter must be a positive integer.')
|
+ if o == '-p':
|
+ if a.lower() in self.PARALLEL_STRATEGIES:
|
+ self.parallel_strategy = a.lower()
|
+ else:
|
+ raise CommandException(
|
+ "'%s' is not a valid parallelism strategy." % a)
|
+ if o == '-y':
|
+ self.num_slices = self._ParsePositiveInteger(
|
+ a, 'The -y parameter must be a positive integer.')
|
if o == '-s':
|
try:
|
self.thru_filesize = HumanReadableToBytes(a)
|
except ValueError:
|
raise CommandException('Invalid -s parameter.')
|
- if self.thru_filesize > (20 * 1024 ** 3): # Max 20 GiB.
|
- raise CommandException(
|
- 'Maximum throughput file size parameter (-s) is 20 GiB.')
|
+ if o == '-d':
|
+ self.directory = a
|
+ if not os.path.exists(self.directory):
|
+ self.delete_directory = True
|
+ os.makedirs(self.directory)
|
if o == '-t':
|
- self.diag_tests = []
|
+ self.diag_tests = set()
|
for test_name in a.strip().split(','):
|
if test_name.lower() not in self.ALL_DIAG_TESTS:
|
raise CommandException("List of test names (-t) contains invalid "
|
"test name '%s'." % test_name)
|
- self.diag_tests.append(test_name)
|
+ self.diag_tests.add(test_name)
|
if o == '-m':
|
pieces = a.split(':')
|
if len(pieces) != 2:
|
@@ -1452,15 +1866,39 @@ class PerfDiagCommand(Command):
|
raise CommandException("Could not decode input file (-i): '%s'." %
|
a)
|
return
|
+
|
+ # If parallelism is specified, default parallelism strategy to fan.
|
+ if (self.processes > 1 or self.threads > 1) and not self.parallel_strategy:
|
+ self.parallel_strategy = self.FAN
|
+ elif self.processes == 1 and self.threads == 1 and self.parallel_strategy:
|
+ raise CommandException(
|
+ 'Cannot specify parallelism strategy (-p) without also specifying '
|
+ 'multiple threads and/or processes (-c and/or -k).')
|
+
|
if not self.args:
|
self.RaiseWrongNumberOfArgumentsException()
|
|
self.bucket_url = StorageUrlFromString(self.args[0])
|
self.provider = self.bucket_url.scheme
|
- if not (self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket()):
|
+ if not self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket():
|
raise CommandException('The perfdiag command requires a URL that '
|
'specifies a bucket.\n"%s" is not '
|
'valid.' % self.args[0])
|
+
|
+ if (self.thru_filesize > HumanReadableToBytes('2GiB') and
|
+ (self.RTHRU in self.diag_tests or self.WTHRU in self.diag_tests)):
|
+ raise CommandException(
|
+ 'For in-memory tests maximum file size is 2GiB. For larger file '
|
+ 'sizes, specify rthru_file and/or wthru_file with the -t option.')
|
+
|
+ perform_slice = self.parallel_strategy in (self.SLICE, self.BOTH)
|
+ slice_not_available = (
|
+ self.provider == 's3' and self.diag_tests.intersection(self.WTHRU,
|
+ self.WTHRU_FILE))
|
+ if perform_slice and slice_not_available:
|
+ raise CommandException('Sliced uploads are not available for s3. '
|
+ 'Use -p fan or sequential uploads for s3.')
|
+
|
# Ensure the bucket exists.
|
self.gsutil_api.GetBucket(self.bucket_url.bucket_name,
|
provider=self.bucket_url.scheme,
|
@@ -1487,12 +1925,14 @@ class PerfDiagCommand(Command):
|
'Base bucket URI: %s\n'
|
'Number of processes: %d\n'
|
'Number of threads: %d\n'
|
+ 'Parallelism strategy: %s\n'
|
'Throughput file size: %s\n'
|
'Diagnostics to run: %s',
|
- self.num_iterations,
|
+ self.num_objects,
|
self.bucket_url,
|
self.processes,
|
self.threads,
|
+ self.parallel_strategy,
|
MakeHumanReadable(self.thru_filesize),
|
(', '.join(self.diag_tests)))
|
|
@@ -1516,6 +1956,12 @@ class PerfDiagCommand(Command):
|
self._RunLatencyTests()
|
if self.RTHRU in self.diag_tests:
|
self._RunReadThruTests()
|
+ # Run WTHRU_FILE before RTHRU_FILE. If data is created in WTHRU_FILE it
|
+ # will be used in RTHRU_FILE to save time and bandwidth.
|
+ if self.WTHRU_FILE in self.diag_tests:
|
+ self._RunWriteThruTests(use_file=True)
|
+ if self.RTHRU_FILE in self.diag_tests:
|
+ self._RunReadThruTests(use_file=True)
|
if self.WTHRU in self.diag_tests:
|
self._RunWriteThruTests()
|
if self.LIST in self.diag_tests:
|
@@ -1535,6 +1981,7 @@ class PerfDiagCommand(Command):
|
self.results['gsutil_version'] = gslib.VERSION
|
self.results['boto_version'] = boto.__version__
|
|
+ self._TearDown()
|
self._DisplayResults()
|
finally:
|
# TODO: Install signal handlers so this is performed in response to a
|
@@ -1545,40 +1992,6 @@ class PerfDiagCommand(Command):
|
return 0
|
|
|
-class UploadObjectTuple(object):
|
- """Picklable tuple with necessary metadata for an insert object call."""
|
-
|
- def __init__(self, bucket_name, object_name, filepath=None, md5=None,
|
- contents=None):
|
- """Create an upload tuple.
|
-
|
- Args:
|
- bucket_name: Name of the bucket to upload to.
|
- object_name: Name of the object to upload to.
|
- filepath: A file path located in self.file_contents and self.file_md5s.
|
- md5: The MD5 hash of the object being uploaded.
|
- contents: The contents of the file to be uploaded.
|
-
|
- Note: (contents + md5) and filepath are mutually exlusive. You may specify
|
- one or the other, but not both.
|
- Note: If one of contents or md5 are specified, they must both be specified.
|
-
|
- Raises:
|
- InvalidArgument: if the arguments are invalid.
|
- """
|
- self.bucket_name = bucket_name
|
- self.object_name = object_name
|
- self.filepath = filepath
|
- self.md5 = md5
|
- self.contents = contents
|
- if filepath and (md5 or contents is not None):
|
- raise InvalidArgument(
|
- 'Only one of filepath or (md5 + contents) may be specified.')
|
- if not filepath and (not md5 or contents is None):
|
- raise InvalidArgument(
|
- 'Both md5 and contents must be specified.')
|
-
|
-
|
def StorageUrlToUploadObjectMetadata(storage_url):
|
if storage_url.IsCloudUrl() and storage_url.IsObject():
|
upload_target = apitools_messages.Object()
|
|