Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(445)

Unified Diff: tools/telemetry/third_party/gsutilz/gslib/commands/perfdiag.py

Issue 1376593003: Roll gsutil version to 4.15. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/gsutilz/gslib/commands/perfdiag.py
diff --git a/tools/telemetry/third_party/gsutilz/gslib/commands/perfdiag.py b/tools/telemetry/third_party/gsutilz/gslib/commands/perfdiag.py
index d88eae78c1242e823b9be169e8f2f9f103a15a23..f95545bdd85537ef30ed7db418ed6a1760492bc4 100644
--- a/tools/telemetry/third_party/gsutilz/gslib/commands/perfdiag.py
+++ b/tools/telemetry/third_party/gsutilz/gslib/commands/perfdiag.py
@@ -18,6 +18,7 @@ from __future__ import absolute_import
import calendar
from collections import defaultdict
+from collections import namedtuple
import contextlib
import cStringIO
import datetime
@@ -41,17 +42,21 @@ import boto.gs.connection
import gslib
from gslib.cloud_api import NotFoundException
from gslib.cloud_api import ServiceException
-from gslib.cloud_api_helper import GetDownloadSerializationDict
+from gslib.cloud_api_helper import GetDownloadSerializationData
from gslib.command import Command
from gslib.command import DummyArgChecker
from gslib.command_argument import CommandArgument
from gslib.commands import config
from gslib.cs_api_map import ApiSelector
from gslib.exception import CommandException
+from gslib.file_part import FilePart
from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
from gslib.storage_url import StorageUrlFromString
from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
+from gslib.util import CheckFreeSpace
+from gslib.util import DivideAndCeil
from gslib.util import GetCloudApiInstance
+from gslib.util import GetFileSize
from gslib.util import GetMaxRetryDelay
from gslib.util import HumanReadableToBytes
from gslib.util import IS_LINUX
@@ -60,11 +65,11 @@ from gslib.util import MakeHumanReadable
from gslib.util import Percentile
from gslib.util import ResumableThreshold
-
_SYNOPSIS = """
gsutil perfdiag [-i in.json]
- gsutil perfdiag [-o out.json] [-n iterations] [-c processes]
- [-k threads] [-s size] [-t tests] url...
+ gsutil perfdiag [-o out.json] [-n objects] [-c processes]
+ [-k threads] [-p parallelism type] [-y slices] [-s size] [-d directory]
+ [-t tests] url...
"""
_DETAILED_HELP_TEXT = ("""
@@ -99,9 +104,8 @@ _DETAILED_HELP_TEXT = ("""
<B>OPTIONS</B>
- -n Sets the number of iterations performed when downloading and
- uploading files during latency and throughput tests. Defaults to
- 5.
+ -n Sets the number of objects to use when downloading and uploading
+ files during tests. Defaults to 5.
-c Sets the number of processes to use while running throughput
experiments. The default value is 1.
@@ -110,20 +114,55 @@ _DETAILED_HELP_TEXT = ("""
throughput experiments. Each process will receive an equal number
of threads. The default value is 1.
- -s Sets the size (in bytes) of the test file used to perform read
- and write throughput tests. The default is 1 MiB. This can also
- be specified using byte suffixes such as 500K or 1M. Note: these
- values are interpreted as multiples of 1024 (K=1024, M=1024*1024,
- etc.)
+ Note: All specified threads and processes will be created, but may
+ not by saturated with work if too few objects (specified with -n)
+ and too few components (specified with -y) are specified.
+
+ -p Sets the type of parallelism to be used (only applicable when
+ threads or processes are specified and threads * processes > 1).
+ The default is to use fan. Must be one of the following:
+
+ fan
+ Use one thread per object. This is akin to using gsutil -m cp,
+ with sliced object download / parallel composite upload
+ disabled.
+
+ slice
+ Use Y (specified with -y) threads for each object, transferring
+ one object at a time. This is akin to using parallel object
+ download / parallel composite upload, without -m. Sliced
+ uploads not supported for s3.
+
+ both
+ Use Y (specified with -y) threads for each object, transferring
+ multiple objects at a time. This is akin to simultaneously
+ using sliced object download / parallel composite upload and
+ gsutil -m cp. Sliced uploads not supported for s3.
+
+ -y Sets the number of slices to divide each file/object into while
+ transferring data. Only applicable with the slice (or both)
+ parallelism type. The default is 4 slices.
+
+ -s Sets the size (in bytes) for each of the N (set with -n) objects
+ used in the read and write throughput tests. The default is 1 MiB.
+ This can also be specified using byte suffixes such as 500K or 1M.
+ Note: these values are interpreted as multiples of 1024 (K=1024,
+ M=1024*1024, etc.)
+ Note: If rthru_file or wthru_file are performed, N (set with -n)
+ times as much disk space as specified will be required for the
+ operation.
+
+ -d Sets the directory to store temporary local files in. If not
+ specified, a default temporary directory will be used.
-t Sets the list of diagnostic tests to perform. The default is to
- run all diagnostic tests. Must be a comma-separated list
- containing one or more of the following:
+ run the lat, rthru, and wthru diagnostic tests. Must be a
+ comma-separated list containing one or more of the following:
lat
- Runs N iterations (set with -n) of writing the file,
- retrieving its metadata, reading the file, and deleting
- the file. Records the latency of each operation.
+ For N (set with -n) objects, write the object, retrieve its
+ metadata, read the object, and finally delete the object.
+ Record the latency of each operation.
list
Write N (set with -n) objects to the bucket, record how long
@@ -136,15 +175,22 @@ _DETAILED_HELP_TEXT = ("""
Runs N (set with -n) read operations, with at most C
(set with -c) reads outstanding at any given time.
+ rthru_file
+ The same as rthru, but simultaneously writes data to the disk,
+ to gauge the performance impact of the local disk on downloads.
+
wthru
Runs N (set with -n) write operations, with at most C
(set with -c) writes outstanding at any given time.
+ wthru_file
+ The same as wthru, but simultaneously reads data from the disk,
+ to gauge the performance impact of the local disk on uploads.
+
-m Adds metadata to the result JSON file. Multiple -m values can be
specified. Example:
- gsutil perfdiag -m "key1:value1" -m "key2:value2" \
- gs://bucketname/
+ gsutil perfdiag -m "key1:val1" -m "key2:val2" gs://bucketname
Each metadata key will be added to the top-level "metadata"
dictionary in the output JSON file.
@@ -178,6 +224,41 @@ _DETAILED_HELP_TEXT = ("""
this information will be sent to Google unless you choose to send it.
""")
+FileDataTuple = namedtuple(
+ 'FileDataTuple',
+ 'size md5 data')
+
+# Describes one object in a fanned download. If need_to_slice is specified as
+# True, the object should be downloaded with the slice strategy. Other field
+# names are the same as documented in PerfDiagCommand.Download.
+FanDownloadTuple = namedtuple(
+ 'FanDownloadTuple',
+ 'need_to_slice object_name file_name serialization_data')
+
+# Describes one slice in a sliced download.
+# Field names are the same as documented in PerfDiagCommand.Download.
+SliceDownloadTuple = namedtuple(
+ 'SliceDownloadTuple',
+ 'object_name file_name serialization_data start_byte end_byte')
+
+# Describes one file in a fanned upload. If need_to_slice is specified as
+# True, the file should be uploaded with the slice strategy. Other field
+# names are the same as documented in PerfDiagCommand.Upload.
+FanUploadTuple = namedtuple(
+ 'FanUploadTuple',
+ 'need_to_slice file_name object_name use_file')
+
+# Describes one slice in a sliced upload.
+# Field names are the same as documented in PerfDiagCommand.Upload.
+SliceUploadTuple = namedtuple(
+ 'SliceUploadTuple',
+ 'file_name object_name use_file file_start file_size')
+
+# Dict storing file_path:FileDataTuple for each temporary file used by
+# perfdiag. This data should be kept outside of the PerfDiagCommand class
+# since calls to Apply will make copies of all member data.
+temp_file_dict = {}
+
class Error(Exception):
"""Base exception class for this module."""
@@ -189,16 +270,73 @@ class InvalidArgument(Error):
pass
-def _DownloadWrapper(cls, arg, thread_state=None):
- cls.Download(arg, thread_state=thread_state)
+def _DownloadObject(cls, args, thread_state=None):
+ """Function argument to apply for performing fanned parallel downloads.
+ Args:
+ cls: The calling PerfDiagCommand class instance.
+ args: A FanDownloadTuple object describing this download.
+ thread_state: gsutil Cloud API instance to use for the operation.
+ """
+ cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
+ if args.need_to_slice:
+ cls.PerformSlicedDownload(args.object_name, args.file_name,
+ args.serialization_data)
+ else:
+ cls.Download(args.object_name, args.file_name, args.serialization_data)
-def _UploadWrapper(cls, arg, thread_state=None):
- cls.Upload(arg, thread_state=thread_state)
+def _DownloadSlice(cls, args, thread_state=None):
+ """Function argument to apply for performing sliced downloads.
-def _DeleteWrapper(cls, arg, thread_state=None):
- cls.Delete(arg, thread_state=thread_state)
+ Args:
+ cls: The calling PerfDiagCommand class instance.
+ args: A SliceDownloadTuple object describing this download.
+ thread_state: gsutil Cloud API instance to use for the operation.
+ """
+ cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
+ cls.Download(args.object_name, args.file_name, args.serialization_data,
+ args.start_byte, args.end_byte)
+
+
+def _UploadObject(cls, args, thread_state=None):
+ """Function argument to apply for performing fanned parallel uploads.
+
+ Args:
+ cls: The calling PerfDiagCommand class instance.
+ args: A FanUploadTuple object describing this upload.
+ thread_state: gsutil Cloud API instance to use for the operation.
+ """
+ cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
+ if args.need_to_slice:
+ cls.PerformSlicedUpload(args.file_name, args.object_name, args.use_file)
+ else:
+ cls.Upload(args.file_name, args.object_name, args.use_file)
+
+
+def _UploadSlice(cls, args, thread_state=None):
+ """Function argument to apply for performing sliced parallel uploads.
+
+ Args:
+ cls: The calling PerfDiagCommand class instance.
+ args: A SliceUploadTuple object describing this upload.
+ thread_state: gsutil Cloud API instance to use for the operation.
+ """
+ cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
+ cls.Upload(args.file_name, args.object_name, args.use_file,
+ args.file_start, args.file_size)
+
+
+def _DeleteWrapper(cls, object_name, thread_state=None):
+ """Function argument to apply for performing parallel object deletions.
+
+ Args:
+ cls: The calling PerfDiagCommand class instance.
+ object_name: The object name to delete from the test bucket.
+ thread_state: gsutil Cloud API instance to use for the operation.
+ """
+ cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
+ cls.Delete(object_name)
def _PerfdiagExceptionHandler(cls, e):
@@ -216,6 +354,9 @@ class DummyFile(object):
def write(self, *args, **kwargs): # pylint: disable=invalid-name
pass
+ def close(self): # pylint: disable=invalid-name
+ pass
+
# Many functions in perfdiag re-define a temporary function based on a
# variable from a loop, resulting in a false positive from the linter.
@@ -230,7 +371,7 @@ class PerfDiagCommand(Command):
usage_synopsis=_SYNOPSIS,
min_args=0,
max_args=1,
- supported_sub_args='n:c:k:s:t:m:i:o:',
+ supported_sub_args='n:c:k:p:y:s:d:t:m:i:o:',
file_url_ok=False,
provider_url_ok=False,
urls_start_arg=0,
@@ -253,7 +394,7 @@ class PerfDiagCommand(Command):
# Byte sizes to use for latency testing files.
# TODO: Consider letting the user specify these sizes with a configuration
# parameter.
- test_file_sizes = (
+ test_lat_file_sizes = (
0, # 0 bytes
1024, # 1 KiB
102400, # 100 KiB
@@ -262,15 +403,26 @@ class PerfDiagCommand(Command):
# Test names.
RTHRU = 'rthru'
+ RTHRU_FILE = 'rthru_file'
WTHRU = 'wthru'
+ WTHRU_FILE = 'wthru_file'
LAT = 'lat'
LIST = 'list'
+ # Parallelism strategies.
+ FAN = 'fan'
+ SLICE = 'slice'
+ BOTH = 'both'
+
# List of all diagnostic tests.
- ALL_DIAG_TESTS = (RTHRU, WTHRU, LAT, LIST)
+ ALL_DIAG_TESTS = (RTHRU, RTHRU_FILE, WTHRU, WTHRU_FILE, LAT, LIST)
+
# List of diagnostic tests to run by default.
DEFAULT_DIAG_TESTS = (RTHRU, WTHRU, LAT)
+ # List of parallelism strategies.
+ PARALLEL_STRATEGIES = (FAN, SLICE, BOTH)
+
# Google Cloud Storage XML API endpoint host.
XML_API_HOST = boto.config.get(
'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost)
@@ -324,21 +476,58 @@ class PerfDiagCommand(Command):
"subprocess '%s'." % (p.returncode, ' '.join(cmd)))
return stdoutdata if return_output else p.returncode
+ def _WarnIfLargeData(self):
+ """Outputs a warning message if a large amount of data is being used."""
+ if self.num_objects * self.thru_filesize > HumanReadableToBytes('2GiB'):
+ self.logger.info('This is a large operation, and could take a while.')
+
+ def _MakeTempFile(self, file_size=0, mem_metadata=False,
+ mem_data=False, prefix='gsutil_test_file'):
+ """Creates a temporary file of the given size and returns its path.
+
+ Args:
+ file_size: The size of the temporary file to create.
+ mem_metadata: If true, store md5 and file size in memory at
+ temp_file_dict[fpath].md5, tempfile_data[fpath].file_size.
+ mem_data: If true, store the file data in memory at
+ temp_file_dict[fpath].data
+ prefix: The prefix to use for the temporary file. Defaults to
+ gsutil_test_file.
+
+ Returns:
+ The file path of the created temporary file.
+ """
+ fd, fpath = tempfile.mkstemp(suffix='.bin', prefix=prefix,
+ dir=self.directory, text=False)
+ with os.fdopen(fd, 'wb') as fp:
+ random_bytes = os.urandom(min(file_size,
+ self.MAX_UNIQUE_RANDOM_BYTES))
+ total_bytes_written = 0
+ while total_bytes_written < file_size:
+ num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES,
+ file_size - total_bytes_written)
+ fp.write(random_bytes[:num_bytes])
+ total_bytes_written += num_bytes
+
+ if mem_metadata or mem_data:
+ with open(fpath, 'rb') as fp:
+ file_size = GetFileSize(fp) if mem_metadata else None
+ md5 = CalculateB64EncodedMd5FromContents(fp) if mem_metadata else None
+ data = fp.read() if mem_data else None
+ temp_file_dict[fpath] = FileDataTuple(file_size, md5, data)
+
+ self.temporary_files.add(fpath)
+ return fpath
+
def _SetUp(self):
"""Performs setup operations needed before diagnostics can be run."""
# Stores test result data.
self.results = {}
- # List of test files in a temporary location on disk for latency ops.
- self.latency_files = []
- # List of test objects to clean up in the test bucket.
- self.test_object_names = set()
- # Maps each test file path to its size in bytes.
- self.file_sizes = {}
- # Maps each test file to its contents as a string.
- self.file_contents = {}
- # Maps each test file to its MD5 hash.
- self.file_md5s = {}
+ # Set of file paths for local temporary files.
+ self.temporary_files = set()
+ # Set of names for test objects that exist in the test bucket.
+ self.temporary_objects = set()
# Total number of HTTP requests made.
self.total_requests = 0
# Total number of HTTP 5xx errors.
@@ -347,63 +536,86 @@ class PerfDiagCommand(Command):
self.error_responses_by_code = defaultdict(int)
# Total number of socket errors.
self.connection_breaks = 0
-
- def _MakeFile(file_size):
- """Creates a temporary file of the given size and returns its path."""
- fd, fpath = tempfile.mkstemp(suffix='.bin', prefix='gsutil_test_file',
- text=False)
- self.file_sizes[fpath] = file_size
- random_bytes = os.urandom(min(file_size, self.MAX_UNIQUE_RANDOM_BYTES))
- total_bytes = 0
- file_contents = ''
- while total_bytes < file_size:
- num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES, file_size - total_bytes)
- file_contents += random_bytes[:num_bytes]
- total_bytes += num_bytes
- self.file_contents[fpath] = file_contents
- with os.fdopen(fd, 'wb') as f:
- f.write(self.file_contents[fpath])
- with open(fpath, 'rb') as f:
- self.file_md5s[fpath] = CalculateB64EncodedMd5FromContents(f)
- return fpath
-
- # Create files for latency tests.
- for file_size in self.test_file_sizes:
- fpath = _MakeFile(file_size)
- self.latency_files.append(fpath)
-
- # Creating a file for warming up the TCP connection.
- self.tcp_warmup_file = _MakeFile(5 * 1024 * 1024) # 5 Mebibytes.
- # Remote file to use for TCP warmup.
- self.tcp_warmup_remote_file = (str(self.bucket_url) +
- os.path.basename(self.tcp_warmup_file))
-
- # Local file on disk for write throughput tests.
- self.thru_local_file = _MakeFile(self.thru_filesize)
+ # Boolean to prevent doing cleanup twice.
+ self.teardown_completed = False
+
+ # Create files for latency test.
+ if self.LAT in self.diag_tests:
+ self.latency_files = []
+ for file_size in self.test_lat_file_sizes:
+ fpath = self._MakeTempFile(file_size, mem_metadata=True, mem_data=True)
+ self.latency_files.append(fpath)
+
+ # Create files for throughput tests.
+ if self.diag_tests.intersection(
+ (self.RTHRU, self.WTHRU, self.RTHRU_FILE, self.WTHRU_FILE)):
+ # Create a file for warming up the TCP connection.
+ self.tcp_warmup_file = self._MakeTempFile(
+ 5 * 1024 * 1024, mem_metadata=True, mem_data=True)
+
+ # For in memory tests, throughput tests transfer the same object N times
+ # instead of creating N objects, in order to avoid excessive memory usage.
+ if self.diag_tests.intersection((self.RTHRU, self.WTHRU)):
+ self.mem_thru_file_name = self._MakeTempFile(
+ self.thru_filesize, mem_metadata=True, mem_data=True)
+ self.mem_thru_object_name = os.path.basename(self.mem_thru_file_name)
+
+ # For tests that use disk I/O, it is necessary to create N objects in
+ # in order to properly measure the performance impact of seeks.
+ if self.diag_tests.intersection((self.RTHRU_FILE, self.WTHRU_FILE)):
+ # List of file names and corresponding object names to use for file
+ # throughput tests.
+ self.thru_file_names = []
+ self.thru_object_names = []
+
+ free_disk_space = CheckFreeSpace(self.directory)
+ if free_disk_space >= self.thru_filesize * self.num_objects:
+ self.logger.info('\nCreating %d local files each of size %s.'
+ % (self.num_objects,
+ MakeHumanReadable(self.thru_filesize)))
+ self._WarnIfLargeData()
+ for _ in range(self.num_objects):
+ file_name = self._MakeTempFile(self.thru_filesize,
+ mem_metadata=True)
+ self.thru_file_names.append(file_name)
+ self.thru_object_names.append(os.path.basename(file_name))
+ else:
+ raise CommandException(
+ 'Not enough free disk space for throughput files: '
+ '%s of disk space required, but only %s available.'
+ % (MakeHumanReadable(self.thru_filesize * self.num_objects),
+ MakeHumanReadable(free_disk_space)))
# Dummy file buffer to use for downloading that goes nowhere.
self.discard_sink = DummyFile()
+ # Filter out misleading progress callback output and the incorrect
+ # suggestion to use gsutil -m perfdiag.
+ self.logger.addFilter(self._PerfdiagFilter())
+
def _TearDown(self):
"""Performs operations to clean things up after performing diagnostics."""
- for fpath in self.latency_files + [self.thru_local_file,
- self.tcp_warmup_file]:
+ if not self.teardown_completed:
+ temp_file_dict.clear()
+
try:
- os.remove(fpath)
+ for fpath in self.temporary_files:
+ os.remove(fpath)
+ if self.delete_directory:
+ os.rmdir(self.directory)
except OSError:
pass
- for object_name in self.test_object_names:
-
- def _Delete():
- try:
- self.gsutil_api.DeleteObject(self.bucket_url.bucket_name,
- object_name,
- provider=self.provider)
- except NotFoundException:
- pass
-
- self._RunOperation(_Delete)
+ if self.threads > 1 or self.processes > 1:
+ args = [obj for obj in self.temporary_objects]
+ self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
+ arg_checker=DummyArgChecker,
+ parallel_operations_override=True,
+ process_count=self.processes, thread_count=self.threads)
+ else:
+ for object_name in self.temporary_objects:
+ self.Delete(object_name)
+ self.teardown_completed = True
@contextlib.contextmanager
def _Time(self, key, bucket):
@@ -478,12 +690,13 @@ class PerfDiagCommand(Command):
# Stores timing information for each category of operation.
self.results['latency'] = defaultdict(list)
- for i in range(self.num_iterations):
+ for i in range(self.num_objects):
self.logger.info('\nRunning latency iteration %d...', i+1)
for fpath in self.latency_files:
+ file_data = temp_file_dict[fpath]
url = self.bucket_url.Clone()
url.object_name = os.path.basename(fpath)
- file_size = self.file_sizes[fpath]
+ file_size = file_data.size
readable_file_size = MakeHumanReadable(file_size)
self.logger.info(
@@ -493,7 +706,7 @@ class PerfDiagCommand(Command):
upload_target = StorageUrlToUploadObjectMetadata(url)
def _Upload():
- io_fp = cStringIO.StringIO(self.file_contents[fpath])
+ io_fp = cStringIO.StringIO(file_data.data)
with self._Time('UPLOAD_%d' % file_size, self.results['latency']):
self.gsutil_api.UploadObject(
io_fp, upload_target, size=file_size, provider=self.provider,
@@ -508,8 +721,7 @@ class PerfDiagCommand(Command):
'mediaLink', 'size'])
# Download will get the metadata first if we don't pass it in.
download_metadata = self._RunOperation(_Metadata)
- serialization_dict = GetDownloadSerializationDict(download_metadata)
- serialization_data = json.dumps(serialization_dict)
+ serialization_data = GetDownloadSerializationData(download_metadata)
def _Download():
with self._Time('DOWNLOAD_%d' % file_size, self.results['latency']):
@@ -524,213 +736,308 @@ class PerfDiagCommand(Command):
provider=self.provider)
self._RunOperation(_Delete)
- class _CpFilter(logging.Filter):
+ class _PerfdiagFilter(logging.Filter):
def filter(self, record):
- # Used to prevent cp._LogCopyOperation from spewing output from
- # subprocesses about every iteration.
+ # Used to prevent unnecessary output when using multiprocessing.
msg = record.getMessage()
return not (('Copying file:///' in msg) or ('Copying gs://' in msg) or
- ('Computing CRC' in msg))
+ ('Computing CRC' in msg) or ('gsutil -m perfdiag' in msg))
def _PerfdiagExceptionHandler(self, e):
"""Simple exception handler to allow post-completion status."""
self.logger.error(str(e))
- def _RunReadThruTests(self):
- """Runs read throughput tests."""
- self.logger.info(
- '\nRunning read throughput tests (%s iterations of size %s)' %
- (self.num_iterations, MakeHumanReadable(self.thru_filesize)))
-
- self.results['read_throughput'] = {'file_size': self.thru_filesize,
- 'num_times': self.num_iterations,
- 'processes': self.processes,
- 'threads': self.threads}
-
- # Copy the TCP warmup file.
- warmup_url = self.bucket_url.Clone()
- warmup_url.object_name = os.path.basename(self.tcp_warmup_file)
- warmup_target = StorageUrlToUploadObjectMetadata(warmup_url)
- self.test_object_names.add(warmup_url.object_name)
-
- def _Upload1():
- self.gsutil_api.UploadObject(
- cStringIO.StringIO(self.file_contents[self.tcp_warmup_file]),
- warmup_target, provider=self.provider, fields=['name'])
- self._RunOperation(_Upload1)
-
- # Copy the file to remote location before reading.
- thru_url = self.bucket_url.Clone()
- thru_url.object_name = os.path.basename(self.thru_local_file)
- thru_target = StorageUrlToUploadObjectMetadata(thru_url)
- thru_target.md5Hash = self.file_md5s[self.thru_local_file]
- self.test_object_names.add(thru_url.object_name)
-
- # Get the mediaLink here so that we can pass it to download.
- def _Upload2():
- return self.gsutil_api.UploadObject(
- cStringIO.StringIO(self.file_contents[self.thru_local_file]),
- thru_target, provider=self.provider, size=self.thru_filesize,
- fields=['name', 'mediaLink', 'size'])
-
- # Get the metadata for the object so that we are just measuring performance
- # on the actual bytes transfer.
- download_metadata = self._RunOperation(_Upload2)
- serialization_dict = GetDownloadSerializationDict(download_metadata)
- serialization_data = json.dumps(serialization_dict)
+ def PerformFannedDownload(self, need_to_slice, object_names, file_names,
+ serialization_data):
+ """Performs a parallel download of multiple objects using the fan strategy.
- if self.processes == 1 and self.threads == 1:
+ Args:
+ need_to_slice: If True, additionally apply the slice strategy to each
+ object in object_names.
+ object_names: A list of object names to be downloaded. Each object must
+ already exist in the test bucket.
+ file_names: A list, corresponding by index to object_names, of file names
+ for downloaded data. If None, discard downloaded data.
+ serialization_data: A list, corresponding by index to object_names,
+ of serialization data for each object.
+ """
+ args = []
+ for i in range(len(object_names)):
+ file_name = file_names[i] if file_names else None
+ args.append(FanDownloadTuple(
+ need_to_slice, object_names[i], file_name,
+ serialization_data[i]))
+ self.Apply(_DownloadObject, args, _PerfdiagExceptionHandler,
+ ('total_requests', 'request_errors'),
+ arg_checker=DummyArgChecker, parallel_operations_override=True,
+ process_count=self.processes, thread_count=self.threads)
+
+ def PerformSlicedDownload(self, object_name, file_name, serialization_data):
+ """Performs a download of an object using the slice strategy.
- # Warm up the TCP connection.
- def _Warmup():
- self.gsutil_api.GetObjectMedia(warmup_url.bucket_name,
- warmup_url.object_name,
- self.discard_sink,
- provider=self.provider)
- self._RunOperation(_Warmup)
+ Args:
+ object_name: The name of the object to download.
+ file_name: The name of the file to download data to, or None if data
+ should be discarded.
+ serialization_data: The serialization data for the object.
+ """
+ if file_name:
+ with open(file_name, 'ab') as fp:
+ fp.truncate(self.thru_filesize)
+ component_size = DivideAndCeil(self.thru_filesize, self.num_slices)
+ args = []
+ for i in range(self.num_slices):
+ start_byte = i * component_size
+ end_byte = min((i + 1) * (component_size) - 1, self.thru_filesize - 1)
+ args.append(SliceDownloadTuple(object_name, file_name, serialization_data,
+ start_byte, end_byte))
+ self.Apply(_DownloadSlice, args, _PerfdiagExceptionHandler,
+ ('total_requests', 'request_errors'),
+ arg_checker=DummyArgChecker, parallel_operations_override=True,
+ process_count=self.processes, thread_count=self.threads)
+
+ def PerformFannedUpload(self, need_to_slice, file_names, object_names,
+ use_file):
+ """Performs a parallel upload of multiple files using the fan strategy.
+
+ The metadata for file_name should be present in temp_file_dict prior
+ to calling. Also, the data for file_name should be present in temp_file_dict
+ if use_file is specified as False.
- times = []
+ Args:
+ need_to_slice: If True, additionally apply the slice strategy to each
+ file in file_names.
+ file_names: A list of file names to be uploaded.
+ object_names: A list, corresponding by by index to file_names, of object
+ names to upload data to.
+ use_file: If true, use disk I/O, otherwise read upload data from memory.
+ """
+ args = []
+ for i in range(len(file_names)):
+ args.append(FanUploadTuple(
+ need_to_slice, file_names[i], object_names[i], use_file))
+ self.Apply(_UploadObject, args, _PerfdiagExceptionHandler,
+ ('total_requests', 'request_errors'),
+ arg_checker=DummyArgChecker, parallel_operations_override=True,
+ process_count=self.processes, thread_count=self.threads)
+
+ def PerformSlicedUpload(self, file_name, object_name, use_file):
+ """Performs a parallel upload of a file using the slice strategy.
+
+ The metadata for file_name should be present in temp_file_dict prior
+ to calling. Also, the data from for file_name should be present in
+ temp_file_dict if use_file is specified as False.
- def _Download():
- t0 = time.time()
- self.gsutil_api.GetObjectMedia(
- thru_url.bucket_name, thru_url.object_name, self.discard_sink,
- provider=self.provider, serialization_data=serialization_data)
- t1 = time.time()
- times.append(t1 - t0)
- for _ in range(self.num_iterations):
- self._RunOperation(_Download)
- time_took = sum(times)
+ Args:
+ file_name: The name of the file to upload.
+ object_name: The name of the object to upload to.
+ use_file: If true, use disk I/O, otherwise read upload data from memory.
+ """
+ # Divide the file into components.
+ component_size = DivideAndCeil(self.thru_filesize, self.num_slices)
+ component_object_names = (
+ [object_name + str(i) for i in range(self.num_slices)])
+
+ args = []
+ for i in range(self.num_slices):
+ component_start = i * component_size
+ component_size = min(component_size,
+ temp_file_dict[file_name].size - component_start)
+ args.append(SliceUploadTuple(file_name, component_object_names[i],
+ use_file, component_start, component_size))
+
+ # Upload the components in parallel.
+ try:
+ self.Apply(_UploadSlice, args, _PerfdiagExceptionHandler,
+ ('total_requests', 'request_errors'),
+ arg_checker=DummyArgChecker, parallel_operations_override=True,
+ process_count=self.processes, thread_count=self.threads)
+
+ # Compose the components into an object.
+ request_components = []
+ for i in range(self.num_slices):
+ src_obj_metadata = (
+ apitools_messages.ComposeRequest.SourceObjectsValueListEntry(
+ name=component_object_names[i]))
+ request_components.append(src_obj_metadata)
+
+ dst_obj_metadata = apitools_messages.Object()
+ dst_obj_metadata.name = object_name
+ dst_obj_metadata.bucket = self.bucket_url.bucket_name
+ def _Compose():
+ self.gsutil_api.ComposeObject(request_components, dst_obj_metadata,
+ provider=self.provider)
+ self._RunOperation(_Compose)
+ finally:
+ # Delete the temporary components.
+ self.Apply(_DeleteWrapper, component_object_names,
+ _PerfdiagExceptionHandler,
+ ('total_requests', 'request_errors'),
+ arg_checker=DummyArgChecker, parallel_operations_override=True,
+ process_count=self.processes, thread_count=self.threads)
+
+ def _RunReadThruTests(self, use_file=False):
+ """Runs read throughput tests."""
+ test_name = 'read_throughput_file' if use_file else 'read_throughput'
+ file_io_string = 'with file I/O' if use_file else ''
+ self.logger.info(
+ '\nRunning read throughput tests %s (%s objects of size %s)' %
+ (file_io_string, self.num_objects,
+ MakeHumanReadable(self.thru_filesize)))
+ self._WarnIfLargeData()
+
+ self.results[test_name] = {'file_size': self.thru_filesize,
+ 'processes': self.processes,
+ 'threads': self.threads,
+ 'parallelism': self.parallel_strategy
+ }
+
+ # Copy the file(s) to the test bucket, and also get the serialization data
+ # so that we can pass it to download.
+ if use_file:
+ # For test with file I/O use N files on disk to preserve seek performance.
+ file_names = self.thru_file_names
+ object_names = self.thru_object_names
+ serialization_data = []
+ for i in range(self.num_objects):
+ self.temporary_objects.add(self.thru_object_names[i])
+ if self.WTHRU_FILE in self.diag_tests:
+ # If we ran the WTHRU_FILE test, then the objects already exist.
+ obj_metadata = self.gsutil_api.GetObjectMetadata(
+ self.bucket_url.bucket_name, self.thru_object_names[i],
+ fields=['size', 'mediaLink'], provider=self.bucket_url.scheme)
+ else:
+ obj_metadata = self.Upload(self.thru_file_names[i],
+ self.thru_object_names[i], use_file)
+
+ # File overwrite causes performance issues with sliced downloads.
+ # Delete the file and reopen it for download. This matches what a real
+ # download would look like.
+ os.unlink(self.thru_file_names[i])
+ open(self.thru_file_names[i], 'ab').close()
+ serialization_data.append(GetDownloadSerializationData(obj_metadata))
else:
- args = ([(thru_url.bucket_name, thru_url.object_name, serialization_data)]
- * self.num_iterations)
- self.logger.addFilter(self._CpFilter())
+ # For in-memory test only use one file but copy it num_objects times, to
+ # allow scalability in num_objects.
+ self.temporary_objects.add(self.mem_thru_object_name)
+ obj_metadata = self.Upload(self.mem_thru_file_name,
+ self.mem_thru_object_name, use_file)
+ file_names = None
+ object_names = [self.mem_thru_object_name] * self.num_objects
+ serialization_data = (
+ [GetDownloadSerializationData(obj_metadata)] * self.num_objects)
+
+ # Warmup the TCP connection.
+ warmup_obj_name = os.path.basename(self.tcp_warmup_file)
+ self.temporary_objects.add(warmup_obj_name)
+ self.Upload(self.tcp_warmup_file, warmup_obj_name)
+ self.Download(warmup_obj_name)
- t0 = time.time()
- self.Apply(_DownloadWrapper,
- args,
- _PerfdiagExceptionHandler,
- arg_checker=DummyArgChecker,
- parallel_operations_override=True,
- process_count=self.processes,
- thread_count=self.threads)
- t1 = time.time()
- time_took = t1 - t0
+ t0 = time.time()
+ if self.processes == 1 and self.threads == 1:
+ for i in range(self.num_objects):
+ file_name = file_names[i] if use_file else None
+ self.Download(object_names[i], file_name, serialization_data[i])
+ else:
+ if self.parallel_strategy in (self.FAN, self.BOTH):
+ need_to_slice = (self.parallel_strategy == self.BOTH)
+ self.PerformFannedDownload(need_to_slice, object_names, file_names,
+ serialization_data)
+ elif self.parallel_strategy == self.SLICE:
+ for i in range(self.num_objects):
+ file_name = file_names[i] if use_file else None
+ self.PerformSlicedDownload(
+ object_names[i], file_name, serialization_data[i])
+ t1 = time.time()
- total_bytes_copied = self.thru_filesize * self.num_iterations
+ time_took = t1 - t0
+ total_bytes_copied = self.thru_filesize * self.num_objects
bytes_per_second = total_bytes_copied / time_took
- self.results['read_throughput']['time_took'] = time_took
- self.results['read_throughput']['total_bytes_copied'] = total_bytes_copied
- self.results['read_throughput']['bytes_per_second'] = bytes_per_second
+ self.results[test_name]['time_took'] = time_took
+ self.results[test_name]['total_bytes_copied'] = total_bytes_copied
+ self.results[test_name]['bytes_per_second'] = bytes_per_second
- def _RunWriteThruTests(self):
+ def _RunWriteThruTests(self, use_file=False):
"""Runs write throughput tests."""
+ test_name = 'write_throughput_file' if use_file else 'write_throughput'
+ file_io_string = 'with file I/O' if use_file else ''
self.logger.info(
- '\nRunning write throughput tests (%s iterations of size %s)' %
- (self.num_iterations, MakeHumanReadable(self.thru_filesize)))
-
- self.results['write_throughput'] = {'file_size': self.thru_filesize,
- 'num_copies': self.num_iterations,
- 'processes': self.processes,
- 'threads': self.threads}
-
- warmup_url = self.bucket_url.Clone()
- warmup_url.object_name = os.path.basename(self.tcp_warmup_file)
- warmup_target = StorageUrlToUploadObjectMetadata(warmup_url)
- self.test_object_names.add(warmup_url.object_name)
-
- thru_url = self.bucket_url.Clone()
- thru_url.object_name = os.path.basename(self.thru_local_file)
- thru_target = StorageUrlToUploadObjectMetadata(thru_url)
- thru_tuples = []
- for i in xrange(self.num_iterations):
- # Create a unique name for each uploaded object. Otherwise,
- # the XML API would fail when trying to non-atomically get metadata
- # for the object that gets blown away by the overwrite.
- remote_object_name = thru_target.name + str(i)
- self.test_object_names.add(remote_object_name)
- thru_tuples.append(UploadObjectTuple(thru_target.bucket,
- remote_object_name,
- filepath=self.thru_local_file))
-
- if self.processes == 1 and self.threads == 1:
- # Warm up the TCP connection.
- def _Warmup():
- self.gsutil_api.UploadObject(
- cStringIO.StringIO(self.file_contents[self.tcp_warmup_file]),
- warmup_target, provider=self.provider, size=self.thru_filesize,
- fields=['name'])
- self._RunOperation(_Warmup)
-
- times = []
-
- for i in xrange(self.num_iterations):
- thru_tuple = thru_tuples[i]
- def _Upload():
- """Uploads the write throughput measurement object."""
- upload_target = apitools_messages.Object(
- bucket=thru_tuple.bucket_name, name=thru_tuple.object_name,
- md5Hash=thru_tuple.md5)
- io_fp = cStringIO.StringIO(self.file_contents[self.thru_local_file])
- t0 = time.time()
- if self.thru_filesize < ResumableThreshold():
- self.gsutil_api.UploadObject(
- io_fp, upload_target, provider=self.provider,
- size=self.thru_filesize, fields=['name'])
- else:
- self.gsutil_api.UploadObjectResumable(
- io_fp, upload_target, provider=self.provider,
- size=self.thru_filesize, fields=['name'],
- tracker_callback=_DummyTrackerCallback)
-
- t1 = time.time()
- times.append(t1 - t0)
+ '\nRunning write throughput tests %s (%s objects of size %s)' %
+ (file_io_string, self.num_objects,
+ MakeHumanReadable(self.thru_filesize)))
+ self._WarnIfLargeData()
+
+ self.results[test_name] = {'file_size': self.thru_filesize,
+ 'processes': self.processes,
+ 'threads': self.threads,
+ 'parallelism': self.parallel_strategy}
+
+ # Warmup the TCP connection.
+ warmup_obj_name = os.path.basename(self.tcp_warmup_file)
+ self.temporary_objects.add(warmup_obj_name)
+ self.Upload(self.tcp_warmup_file, warmup_obj_name)
+
+ if use_file:
+ # For test with file I/O use N files on disk to preserve seek performance.
+ file_names = self.thru_file_names
+ object_names = self.thru_object_names
+ else:
+ # For in-memory test only use one file but copy it num_objects times, to
+ # allow for scalability in num_objects.
+ file_names = [self.mem_thru_file_name] * self.num_objects
+ object_names = (
+ [self.mem_thru_object_name + str(i) for i in range(self.num_objects)])
- self._RunOperation(_Upload)
- time_took = sum(times)
+ for object_name in object_names:
+ self.temporary_objects.add(object_name)
+ t0 = time.time()
+ if self.processes == 1 and self.threads == 1:
+ for i in range(self.num_objects):
+ self.Upload(file_names[i], object_names[i], use_file)
else:
- args = thru_tuples
- t0 = time.time()
- self.Apply(_UploadWrapper,
- args,
- _PerfdiagExceptionHandler,
- arg_checker=DummyArgChecker,
- parallel_operations_override=True,
- process_count=self.processes,
- thread_count=self.threads)
- t1 = time.time()
- time_took = t1 - t0
+ if self.parallel_strategy in (self.FAN, self.BOTH):
+ need_to_slice = (self.parallel_strategy == self.BOTH)
+ self.PerformFannedUpload(need_to_slice, file_names, object_names,
+ use_file)
+ elif self.parallel_strategy == self.SLICE:
+ for i in range(self.num_objects):
+ self.PerformSlicedUpload(file_names[i], object_names[i], use_file)
+ t1 = time.time()
- total_bytes_copied = self.thru_filesize * self.num_iterations
+ time_took = t1 - t0
+ total_bytes_copied = self.thru_filesize * self.num_objects
bytes_per_second = total_bytes_copied / time_took
- self.results['write_throughput']['time_took'] = time_took
- self.results['write_throughput']['total_bytes_copied'] = total_bytes_copied
- self.results['write_throughput']['bytes_per_second'] = bytes_per_second
+ self.results[test_name]['time_took'] = time_took
+ self.results[test_name]['total_bytes_copied'] = total_bytes_copied
+ self.results[test_name]['bytes_per_second'] = bytes_per_second
def _RunListTests(self):
"""Runs eventual consistency listing latency tests."""
- self.results['listing'] = {'num_files': self.num_iterations}
+ self.results['listing'] = {'num_files': self.num_objects}
- # Generate N random object names to put in the bucket.
+ # Generate N random objects to put into the bucket.
list_prefix = 'gsutil-perfdiag-list-'
+ list_fpaths = []
list_objects = []
- for _ in xrange(self.num_iterations):
- list_object_name = u'%s%s' % (list_prefix, os.urandom(20).encode('hex'))
- self.test_object_names.add(list_object_name)
- list_objects.append(list_object_name)
+ args = []
+ for _ in xrange(self.num_objects):
+ fpath = self._MakeTempFile(0, mem_data=True, mem_metadata=True,
+ prefix=list_prefix)
+ list_fpaths.append(fpath)
+ object_name = os.path.basename(fpath)
+ list_objects.append(object_name)
+ args.append(FanUploadTuple(False, fpath, object_name, False))
+ self.temporary_objects.add(object_name)
# Add the objects to the bucket.
self.logger.info(
- '\nWriting %s objects for listing test...', self.num_iterations)
- empty_md5 = CalculateB64EncodedMd5FromContents(cStringIO.StringIO(''))
- args = [
- UploadObjectTuple(self.bucket_url.bucket_name, name, md5=empty_md5,
- contents='') for name in list_objects]
- self.Apply(_UploadWrapper, args, _PerfdiagExceptionHandler,
+ '\nWriting %s objects for listing test...', self.num_objects)
+
+ self.Apply(_UploadObject, args, _PerfdiagExceptionHandler,
arg_checker=DummyArgChecker)
list_latencies = []
@@ -743,7 +1050,7 @@ class PerfDiagCommand(Command):
"""Lists and returns objects in the bucket. Also records latency."""
t0 = time.time()
objects = list(self.gsutil_api.ListObjects(
- self.bucket_url.bucket_name, prefix=list_prefix, delimiter='/',
+ self.bucket_url.bucket_name, delimiter='/',
provider=self.provider, fields=['items/name']))
t1 = time.time()
list_latencies.append(t1 - t0)
@@ -751,7 +1058,7 @@ class PerfDiagCommand(Command):
self.logger.info(
'Listing bucket %s waiting for %s objects to appear...',
- self.bucket_url.bucket_name, self.num_iterations)
+ self.bucket_url.bucket_name, self.num_objects)
while expected_objects - found_objects:
def _ListAfterUpload():
names = _List()
@@ -771,14 +1078,15 @@ class PerfDiagCommand(Command):
'time_took': total_end_time - total_start_time,
}
+ args = [object_name for object_name in list_objects]
self.logger.info(
- 'Deleting %s objects for listing test...', self.num_iterations)
+ 'Deleting %s objects for listing test...', self.num_objects)
self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
arg_checker=DummyArgChecker)
self.logger.info(
'Listing bucket %s waiting for %s objects to disappear...',
- self.bucket_url.bucket_name, self.num_iterations)
+ self.bucket_url.bucket_name, self.num_objects)
list_latencies = []
files_seen = []
total_start_time = time.time()
@@ -802,45 +1110,102 @@ class PerfDiagCommand(Command):
'time_took': total_end_time - total_start_time,
}
- def Upload(self, thru_tuple, thread_state=None):
- gsutil_api = GetCloudApiInstance(self, thread_state)
-
- md5hash = thru_tuple.md5
- contents = thru_tuple.contents
- if thru_tuple.filepath:
- md5hash = self.file_md5s[thru_tuple.filepath]
- contents = self.file_contents[thru_tuple.filepath]
-
- upload_target = apitools_messages.Object(
- bucket=thru_tuple.bucket_name, name=thru_tuple.object_name,
- md5Hash=md5hash)
- file_size = len(contents)
- if file_size < ResumableThreshold():
- gsutil_api.UploadObject(
- cStringIO.StringIO(contents), upload_target,
- provider=self.provider, size=file_size, fields=['name'])
- else:
- gsutil_api.UploadObjectResumable(
- cStringIO.StringIO(contents), upload_target,
- provider=self.provider, size=file_size, fields=['name'],
- tracker_callback=_DummyTrackerCallback)
+ def Upload(self, file_name, object_name, use_file=False, file_start=0,
+ file_size=None):
+ """Performs an upload to the test bucket.
- def Download(self, download_tuple, thread_state=None):
- """Downloads a file.
+ The file is uploaded to the bucket referred to by self.bucket_url, and has
+ name object_name.
Args:
- download_tuple: (bucket name, object name, serialization data for object).
- thread_state: gsutil Cloud API instance to use for the download.
+ file_name: The path to the local file, and the key to its entry in
+ temp_file_dict.
+ object_name: The name of the remote object.
+ use_file: If true, use disk I/O, otherwise read everything from memory.
+ file_start: The first byte in the file to upload to the object.
+ (only should be specified for sliced uploads)
+ file_size: The size of the file to upload.
+ (only should be specified for sliced uploads)
+
+ Returns:
+ Uploaded Object Metadata.
"""
- gsutil_api = GetCloudApiInstance(self, thread_state)
- gsutil_api.GetObjectMedia(
- download_tuple[0], download_tuple[1], self.discard_sink,
- provider=self.provider, serialization_data=download_tuple[2])
+ fp = None
+ if file_size is None:
+ file_size = temp_file_dict[file_name].size
- def Delete(self, thru_tuple, thread_state=None):
- gsutil_api = thread_state or self.gsutil_api
- gsutil_api.DeleteObject(
- thru_tuple.bucket_name, thru_tuple.object_name, provider=self.provider)
+ upload_url = self.bucket_url.Clone()
+ upload_url.object_name = object_name
+ upload_target = StorageUrlToUploadObjectMetadata(upload_url)
+
+ try:
+ if use_file:
+ fp = FilePart(file_name, file_start, file_size)
+ else:
+ data = temp_file_dict[file_name].data[file_start:file_start+file_size]
+ fp = cStringIO.StringIO(data)
+
+ def _InnerUpload():
+ if file_size < ResumableThreshold():
+ return self.gsutil_api.UploadObject(
+ fp, upload_target, provider=self.provider, size=file_size,
+ fields=['name', 'mediaLink', 'size'])
+ else:
+ return self.gsutil_api.UploadObjectResumable(
+ fp, upload_target, provider=self.provider, size=file_size,
+ fields=['name', 'mediaLink', 'size'],
+ tracker_callback=_DummyTrackerCallback)
+ return self._RunOperation(_InnerUpload)
+ finally:
+ if fp:
+ fp.close()
+
+ def Download(self, object_name, file_name=None, serialization_data=None,
+ start_byte=0, end_byte=None):
+ """Downloads an object from the test bucket.
+
+ Args:
+ object_name: The name of the object (in the test bucket) to download.
+ file_name: Optional file name to write downloaded data to. If None,
+ downloaded data is discarded immediately.
+ serialization_data: Optional serialization data, used so that we don't
+ have to get the metadata before downloading.
+ start_byte: The first byte in the object to download.
+ (only should be specified for sliced downloads)
+ end_byte: The last byte in the object to download.
+ (only should be specified for sliced downloads)
+ """
+ fp = None
+ try:
+ if file_name is not None:
+ fp = open(file_name, 'r+b')
+ fp.seek(start_byte)
+ else:
+ fp = self.discard_sink
+
+ def _InnerDownload():
+ self.gsutil_api.GetObjectMedia(
+ self.bucket_url.bucket_name, object_name, fp,
+ provider=self.provider, start_byte=start_byte, end_byte=end_byte,
+ serialization_data=serialization_data)
+ self._RunOperation(_InnerDownload)
+ finally:
+ if fp:
+ fp.close()
+
+ def Delete(self, object_name):
+ """Deletes an object from the test bucket.
+
+ Args:
+ object_name: The name of the object to delete.
+ """
+ try:
+ def _InnerDelete():
+ self.gsutil_api.DeleteObject(self.bucket_url.bucket_name, object_name,
+ provider=self.provider)
+ self._RunOperation(_InnerDelete)
+ except NotFoundException:
+ pass
def _GetDiskCounters(self):
"""Retrieves disk I/O statistics for all disks.
@@ -966,7 +1331,7 @@ class PerfDiagCommand(Command):
sysinfo['ip_address'] = ''
# Record the temporary directory used since it can affect performance, e.g.
# when on a networked filesystem.
- sysinfo['tempdir'] = tempfile.gettempdir()
+ sysinfo['tempdir'] = self.directory
# Produces an RFC 2822 compliant GMT timestamp.
sysinfo['gmt_timestamp'] = time.strftime('%a, %d %b %Y %H:%M:%S +0000',
@@ -1178,12 +1543,27 @@ class PerfDiagCommand(Command):
print 'Write Throughput'.center(78)
print '-' * 78
write_thru = self.results['write_throughput']
- print 'Copied a %s file %d times for a total transfer size of %s.' % (
+ print 'Copied %s %s file(s) for a total transfer size of %s.' % (
+ self.num_objects,
MakeHumanReadable(write_thru['file_size']),
- write_thru['num_copies'],
MakeHumanReadable(write_thru['total_bytes_copied']))
print 'Write throughput: %s/s.' % (
MakeBitsHumanReadable(write_thru['bytes_per_second'] * 8))
+ print 'Parallelism strategy: %s' % write_thru['parallelism']
+
+ if 'write_throughput_file' in self.results:
+ print
+ print '-' * 78
+ print 'Write Throughput With File I/O'.center(78)
+ print '-' * 78
+ write_thru_file = self.results['write_throughput_file']
+ print 'Copied %s %s file(s) for a total transfer size of %s.' % (
+ self.num_objects,
+ MakeHumanReadable(write_thru_file['file_size']),
+ MakeHumanReadable(write_thru_file['total_bytes_copied']))
+ print 'Write throughput: %s/s.' % (
+ MakeBitsHumanReadable(write_thru_file['bytes_per_second'] * 8))
+ print 'Parallelism strategy: %s' % write_thru_file['parallelism']
if 'read_throughput' in self.results:
print
@@ -1191,12 +1571,27 @@ class PerfDiagCommand(Command):
print 'Read Throughput'.center(78)
print '-' * 78
read_thru = self.results['read_throughput']
- print 'Copied a %s file %d times for a total transfer size of %s.' % (
+ print 'Copied %s %s file(s) for a total transfer size of %s.' % (
+ self.num_objects,
MakeHumanReadable(read_thru['file_size']),
- read_thru['num_times'],
MakeHumanReadable(read_thru['total_bytes_copied']))
print 'Read throughput: %s/s.' % (
MakeBitsHumanReadable(read_thru['bytes_per_second'] * 8))
+ print 'Parallelism strategy: %s' % read_thru['parallelism']
+
+ if 'read_throughput_file' in self.results:
+ print
+ print '-' * 78
+ print 'Read Throughput With File I/O'.center(78)
+ print '-' * 78
+ read_thru_file = self.results['read_throughput_file']
+ print 'Copied %s %s file(s) for a total transfer size of %s.' % (
+ self.num_objects,
+ MakeHumanReadable(read_thru_file['file_size']),
+ MakeHumanReadable(read_thru_file['total_bytes_copied']))
+ print 'Read throughput: %s/s.' % (
+ MakeBitsHumanReadable(read_thru_file['bytes_per_second'] * 8))
+ print 'Parallelism strategy: %s' % read_thru_file['parallelism']
if 'listing' in self.results:
print
@@ -1389,15 +1784,23 @@ class PerfDiagCommand(Command):
def _ParseArgs(self):
"""Parses arguments for perfdiag command."""
# From -n.
- self.num_iterations = 5
+ self.num_objects = 5
# From -c.
self.processes = 1
# From -k.
self.threads = 1
+ # From -p
+ self.parallel_strategy = None
+ # From -y
+ self.num_slices = 4
# From -s.
self.thru_filesize = 1048576
+ # From -d.
+ self.directory = tempfile.gettempdir()
+ # Keep track of whether or not to delete the directory upon completion.
+ self.delete_directory = False
# From -t.
- self.diag_tests = self.DEFAULT_DIAG_TESTS
+ self.diag_tests = set(self.DEFAULT_DIAG_TESTS)
# From -o.
self.output_file = None
# From -i.
@@ -1408,7 +1811,7 @@ class PerfDiagCommand(Command):
if self.sub_opts:
for o, a in self.sub_opts:
if o == '-n':
- self.num_iterations = self._ParsePositiveInteger(
+ self.num_objects = self._ParsePositiveInteger(
a, 'The -n parameter must be a positive integer.')
if o == '-c':
self.processes = self._ParsePositiveInteger(
@@ -1416,21 +1819,32 @@ class PerfDiagCommand(Command):
if o == '-k':
self.threads = self._ParsePositiveInteger(
a, 'The -k parameter must be a positive integer.')
+ if o == '-p':
+ if a.lower() in self.PARALLEL_STRATEGIES:
+ self.parallel_strategy = a.lower()
+ else:
+ raise CommandException(
+ "'%s' is not a valid parallelism strategy." % a)
+ if o == '-y':
+ self.num_slices = self._ParsePositiveInteger(
+ a, 'The -y parameter must be a positive integer.')
if o == '-s':
try:
self.thru_filesize = HumanReadableToBytes(a)
except ValueError:
raise CommandException('Invalid -s parameter.')
- if self.thru_filesize > (20 * 1024 ** 3): # Max 20 GiB.
- raise CommandException(
- 'Maximum throughput file size parameter (-s) is 20 GiB.')
+ if o == '-d':
+ self.directory = a
+ if not os.path.exists(self.directory):
+ self.delete_directory = True
+ os.makedirs(self.directory)
if o == '-t':
- self.diag_tests = []
+ self.diag_tests = set()
for test_name in a.strip().split(','):
if test_name.lower() not in self.ALL_DIAG_TESTS:
raise CommandException("List of test names (-t) contains invalid "
"test name '%s'." % test_name)
- self.diag_tests.append(test_name)
+ self.diag_tests.add(test_name)
if o == '-m':
pieces = a.split(':')
if len(pieces) != 2:
@@ -1452,15 +1866,39 @@ class PerfDiagCommand(Command):
raise CommandException("Could not decode input file (-i): '%s'." %
a)
return
+
+ # If parallelism is specified, default parallelism strategy to fan.
+ if (self.processes > 1 or self.threads > 1) and not self.parallel_strategy:
+ self.parallel_strategy = self.FAN
+ elif self.processes == 1 and self.threads == 1 and self.parallel_strategy:
+ raise CommandException(
+ 'Cannot specify parallelism strategy (-p) without also specifying '
+ 'multiple threads and/or processes (-c and/or -k).')
+
if not self.args:
self.RaiseWrongNumberOfArgumentsException()
self.bucket_url = StorageUrlFromString(self.args[0])
self.provider = self.bucket_url.scheme
- if not (self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket()):
+ if not self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket():
raise CommandException('The perfdiag command requires a URL that '
'specifies a bucket.\n"%s" is not '
'valid.' % self.args[0])
+
+ if (self.thru_filesize > HumanReadableToBytes('2GiB') and
+ (self.RTHRU in self.diag_tests or self.WTHRU in self.diag_tests)):
+ raise CommandException(
+ 'For in-memory tests maximum file size is 2GiB. For larger file '
+ 'sizes, specify rthru_file and/or wthru_file with the -t option.')
+
+ perform_slice = self.parallel_strategy in (self.SLICE, self.BOTH)
+ slice_not_available = (
+ self.provider == 's3' and self.diag_tests.intersection(self.WTHRU,
+ self.WTHRU_FILE))
+ if perform_slice and slice_not_available:
+ raise CommandException('Sliced uploads are not available for s3. '
+ 'Use -p fan or sequential uploads for s3.')
+
# Ensure the bucket exists.
self.gsutil_api.GetBucket(self.bucket_url.bucket_name,
provider=self.bucket_url.scheme,
@@ -1487,12 +1925,14 @@ class PerfDiagCommand(Command):
'Base bucket URI: %s\n'
'Number of processes: %d\n'
'Number of threads: %d\n'
+ 'Parallelism strategy: %s\n'
'Throughput file size: %s\n'
'Diagnostics to run: %s',
- self.num_iterations,
+ self.num_objects,
self.bucket_url,
self.processes,
self.threads,
+ self.parallel_strategy,
MakeHumanReadable(self.thru_filesize),
(', '.join(self.diag_tests)))
@@ -1516,6 +1956,12 @@ class PerfDiagCommand(Command):
self._RunLatencyTests()
if self.RTHRU in self.diag_tests:
self._RunReadThruTests()
+ # Run WTHRU_FILE before RTHRU_FILE. If data is created in WTHRU_FILE it
+ # will be used in RTHRU_FILE to save time and bandwidth.
+ if self.WTHRU_FILE in self.diag_tests:
+ self._RunWriteThruTests(use_file=True)
+ if self.RTHRU_FILE in self.diag_tests:
+ self._RunReadThruTests(use_file=True)
if self.WTHRU in self.diag_tests:
self._RunWriteThruTests()
if self.LIST in self.diag_tests:
@@ -1535,6 +1981,7 @@ class PerfDiagCommand(Command):
self.results['gsutil_version'] = gslib.VERSION
self.results['boto_version'] = boto.__version__
+ self._TearDown()
self._DisplayResults()
finally:
# TODO: Install signal handlers so this is performed in response to a
@@ -1545,40 +1992,6 @@ class PerfDiagCommand(Command):
return 0
-class UploadObjectTuple(object):
- """Picklable tuple with necessary metadata for an insert object call."""
-
- def __init__(self, bucket_name, object_name, filepath=None, md5=None,
- contents=None):
- """Create an upload tuple.
-
- Args:
- bucket_name: Name of the bucket to upload to.
- object_name: Name of the object to upload to.
- filepath: A file path located in self.file_contents and self.file_md5s.
- md5: The MD5 hash of the object being uploaded.
- contents: The contents of the file to be uploaded.
-
- Note: (contents + md5) and filepath are mutually exlusive. You may specify
- one or the other, but not both.
- Note: If one of contents or md5 are specified, they must both be specified.
-
- Raises:
- InvalidArgument: if the arguments are invalid.
- """
- self.bucket_name = bucket_name
- self.object_name = object_name
- self.filepath = filepath
- self.md5 = md5
- self.contents = contents
- if filepath and (md5 or contents is not None):
- raise InvalidArgument(
- 'Only one of filepath or (md5 + contents) may be specified.')
- if not filepath and (not md5 or contents is None):
- raise InvalidArgument(
- 'Both md5 and contents must be specified.')
-
-
def StorageUrlToUploadObjectMetadata(storage_url):
if storage_url.IsCloudUrl() and storage_url.IsObject():
upload_target = apitools_messages.Object()
« no previous file with comments | « tools/telemetry/third_party/gsutilz/gslib/commands/mb.py ('k') | tools/telemetry/third_party/gsutilz/gslib/commands/rb.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698