Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(307)

Unified Diff: gslib/commands/perfdiag.py

Issue 698893003: Update checked in version of gsutil to version 4.6 (Closed) Base URL: http://dart.googlecode.com/svn/third_party/gsutil/
Patch Set: Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « gslib/commands/notification.py ('k') | gslib/commands/rb.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: gslib/commands/perfdiag.py
===================================================================
--- gslib/commands/perfdiag.py (revision 33376)
+++ gslib/commands/perfdiag.py (working copy)
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
# Copyright 2012 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -11,16 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
"""Contains the perfdiag gsutil command."""
-# Get the system logging module, not our local logging module.
from __future__ import absolute_import
import calendar
from collections import defaultdict
import contextlib
+import cStringIO
import datetime
+import httplib
import json
import logging
import math
@@ -34,48 +35,42 @@
import tempfile
import time
+from apiclient import errors as apiclient_errors
import boto
-from boto.utils import compute_md5
import boto.gs.connection
import gslib
+from gslib.cloud_api import NotFoundException
+from gslib.cloud_api import ServiceException
+from gslib.cloud_api_helper import GetDownloadSerializationDict
from gslib.command import Command
-from gslib.command import COMMAND_NAME
-from gslib.command import COMMAND_NAME_ALIASES
from gslib.command import DummyArgChecker
-from gslib.command import FILE_URIS_OK
-from gslib.command import MAX_ARGS
-from gslib.command import MIN_ARGS
-from gslib.command import PROVIDER_URIS_OK
-from gslib.command import SUPPORTED_SUB_ARGS
-from gslib.command import URIS_START_ARG
-#from gslib.command_runner import CommandRunner
from gslib.commands import config
+from gslib.cs_api_map import ApiSelector
from gslib.exception import CommandException
-from gslib.help_provider import HELP_NAME
-from gslib.help_provider import HELP_NAME_ALIASES
-from gslib.help_provider import HELP_ONE_LINE_SUMMARY
-from gslib.help_provider import HELP_TEXT
-from gslib.help_provider import HELP_TYPE
-from gslib.help_provider import HelpType
-from gslib.util import GetBotoConfigFileList
+from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
+from gslib.storage_url import StorageUrlFromString
+from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
+from gslib.util import GetCloudApiInstance
+from gslib.util import GetMaxRetryDelay
from gslib.util import HumanReadableToBytes
from gslib.util import IS_LINUX
from gslib.util import MakeBitsHumanReadable
from gslib.util import MakeHumanReadable
from gslib.util import Percentile
+from gslib.util import ResumableThreshold
-_detailed_help_text = ("""
+_DETAILED_HELP_TEXT = ("""
<B>SYNOPSIS</B>
gsutil perfdiag [-i in.json] [-o out.json] [-n iterations] [-c processes]
- [-k threads] [-s size] [-t tests] uri...
+ [-k threads] [-s size] [-t tests] url...
<B>DESCRIPTION</B>
The perfdiag command runs a suite of diagnostic tests for a given Google
Storage bucket.
- The 'uri' parameter must name an existing bucket (e.g. gs://foo) to which
+ The 'url' parameter must name an existing bucket (e.g. gs://foo) to which
the user has write permission. Several test files will be uploaded to and
downloaded from this bucket. All test files will be deleted at the completion
of the diagnostic if it finishes successfully.
@@ -122,6 +117,13 @@
retrieving its metadata, reading the file, and deleting
the file. Records the latency of each operation.
+ list
+ Write N (set with -n) objects to the bucket, record how long
+ it takes for the eventually consistent listing call to return
+ the N objects in its result, delete the N objects, then record
+ how long it takes listing to stop returning the N objects.
+ This test is off by default.
+
rthru
Runs N (set with -n) read operations, with at most C
(set with -c) reads outstanding at any given time.
@@ -163,65 +165,77 @@
<B>NOTE</B>
The perfdiag command collects system information. It collects your IP address,
executes DNS queries to Google servers and collects the results, and collects
- network statistics information from the output of netstat -s. None of this
- information will be sent to Google unless you choose to send it.
+ network statistics information from the output of netstat -s. It will also
+ attempt to connect to your proxy server if you have one configured. None of
+ this information will be sent to Google unless you choose to send it.
""")
-def _DownloadKey(cls, key):
- key.get_contents_to_file(cls.devnull, **cls.get_contents_to_file_args)
-
-def _UploadKey(cls, key):
- return key.set_contents_from_string(cls.file_contents[cls.thru_local_file],
- md5=cls.file_md5s[cls.thru_local_file])
-
+
+class Error(Exception):
+ """Base exception class for this module."""
+ pass
+
+
+class InvalidArgument(Error):
+ """Raised on invalid arguments to functions."""
+ pass
+
+
+def _DownloadWrapper(cls, arg, thread_state=None):
+ cls.Download(arg, thread_state=thread_state)
+
+
+def _UploadWrapper(cls, arg, thread_state=None):
+ cls.Upload(arg, thread_state=thread_state)
+
+
+def _DeleteWrapper(cls, arg, thread_state=None):
+ cls.Delete(arg, thread_state=thread_state)
+
+
def _PerfdiagExceptionHandler(cls, e):
"""Simple exception handler to allow post-completion status."""
cls.logger.error(str(e))
-
+
+def _DummyTrackerCallback(_):
+ pass
+
+
class DummyFile(object):
"""A dummy, file-like object that throws away everything written to it."""
- def write(self, *args, **kwargs): # pylint: disable-msg=C6409
+ def write(self, *args, **kwargs): # pylint: disable=invalid-name
pass
class PerfDiagCommand(Command):
"""Implementation of gsutil perfdiag command."""
- # Command specification (processed by parent class).
- command_spec = {
- # Name of command.
- COMMAND_NAME: 'perfdiag',
- # List of command name aliases.
- COMMAND_NAME_ALIASES: ['diag', 'diagnostic', 'perf', 'performance'],
- # Min number of args required by this command.
- MIN_ARGS: 0,
- # Max number of args required by this command, or NO_MAX.
- MAX_ARGS: 1,
- # Getopt-style string specifying acceptable sub args.
- SUPPORTED_SUB_ARGS: 'n:c:k:s:t:m:i:o:',
- # True if file URIs acceptable for this command.
- FILE_URIS_OK: False,
- # True if provider-only URIs acceptable for this command.
- PROVIDER_URIS_OK: False,
- # Index in args of first URI arg.
- URIS_START_ARG: 0,
- }
- help_spec = {
- # Name of command or auxiliary help info for which this help applies.
- HELP_NAME: 'perfdiag',
- # List of help name aliases.
- HELP_NAME_ALIASES: [],
- # Type of help:
- HELP_TYPE: HelpType.COMMAND_HELP,
- # One line summary of this help.
- HELP_ONE_LINE_SUMMARY: 'Run performance diagnostic',
- # The full help text.
- HELP_TEXT: _detailed_help_text,
- }
+ # Command specification. See base class for documentation.
+ command_spec = Command.CreateCommandSpec(
+ 'perfdiag',
+ command_name_aliases=['diag', 'diagnostic', 'perf', 'performance'],
+ min_args=0,
+ max_args=1,
+ supported_sub_args='n:c:k:s:t:m:i:o:',
+ file_url_ok=False,
+ provider_url_ok=False,
+ urls_start_arg=0,
+ gs_api_support=[ApiSelector.XML, ApiSelector.JSON],
+ gs_default_api=ApiSelector.JSON,
+ )
+ # Help specification. See help_provider.py for documentation.
+ help_spec = Command.HelpSpec(
+ help_name='perfdiag',
+ help_name_aliases=[],
+ help_type='command_help',
+ help_one_line_summary='Run performance diagnostic',
+ help_text=_DETAILED_HELP_TEXT,
+ subcommand_help_text={},
+ )
- # Byte sizes to use for testing files.
+ # Byte sizes to use for latency testing files.
# TODO: Consider letting the user specify these sizes with a configuration
# parameter.
test_file_sizes = (
@@ -231,11 +245,22 @@
1048576, # 1MB
)
+ # Test names.
+ RTHRU = 'rthru'
+ WTHRU = 'wthru'
+ LAT = 'lat'
+ LIST = 'list'
+
# List of all diagnostic tests.
- ALL_DIAG_TESTS = ('rthru', 'wthru', 'lat')
+ ALL_DIAG_TESTS = (RTHRU, WTHRU, LAT, LIST)
+ # List of diagnostic tests to run by default.
+ DEFAULT_DIAG_TESTS = (RTHRU, WTHRU, LAT)
- # Google Cloud Storage API endpoint host.
- GOOGLE_API_HOST = boto.gs.connection.GSConnection.DefaultHost
+ # Google Cloud Storage XML API endpoint host.
+ XML_API_HOST = boto.config.get(
+ 'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost)
+ # Google Cloud Storage XML API endpoint port.
+ XML_API_PORT = boto.config.get('Credentials', 'gs_port', 80)
# Maximum number of times to retry requests on 5xx errors.
MAX_SERVER_ERROR_RETRIES = 5
@@ -251,6 +276,10 @@
# to repeat bytes. This number was chosen as the next prime larger than 5 MB.
MAX_UNIQUE_RANDOM_BYTES = 5242883
+ # Maximum amount of time, in seconds, we will wait for object listings to
+ # reflect what we expect in the listing tests.
+ MAX_LISTING_WAIT_TIME = 60.0
+
def _Exec(self, cmd, raise_on_error=True, return_output=False,
mute_stderr=False):
"""Executes a command in a subprocess.
@@ -274,7 +303,7 @@
self.logger.debug('Running command: %s', cmd)
stderr = subprocess.PIPE if mute_stderr else None
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=stderr)
- (stdoutdata, stderrdata) = p.communicate()
+ (stdoutdata, _) = p.communicate()
if raise_on_error and p.returncode:
raise CommandException("Received non-zero return code (%d) from "
"subprocess '%s'." % (p.returncode, ' '.join(cmd)))
@@ -309,7 +338,7 @@
self.file_sizes[fpath] = file_size
random_bytes = os.urandom(min(file_size, self.MAX_UNIQUE_RANDOM_BYTES))
total_bytes = 0
- file_contents = ""
+ file_contents = ''
while total_bytes < file_size:
num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES, file_size - total_bytes)
file_contents += random_bytes[:num_bytes]
@@ -318,7 +347,7 @@
with os.fdopen(fd, 'wb') as f:
f.write(self.file_contents[fpath])
with open(fpath, 'rb') as f:
- self.file_md5s[fpath] = compute_md5(f)
+ self.file_md5s[fpath] = CalculateB64EncodedMd5FromContents(f)
return fpath
# Create files for latency tests.
@@ -329,16 +358,16 @@
# Creating a file for warming up the TCP connection.
self.tcp_warmup_file = _MakeFile(5 * 1024 * 1024) # 5 Megabytes.
# Remote file to use for TCP warmup.
- self.tcp_warmup_remote_file = (str(self.bucket_uri) +
+ self.tcp_warmup_remote_file = (str(self.bucket_url) +
os.path.basename(self.tcp_warmup_file))
# Local file on disk for write throughput tests.
self.thru_local_file = _MakeFile(self.thru_filesize)
# Remote file to write/read from during throughput tests.
- self.thru_remote_file = (str(self.bucket_uri) +
+ self.thru_remote_file = (str(self.bucket_url) +
os.path.basename(self.thru_local_file))
# Dummy file buffer to use for downloading that goes nowhere.
- self.devnull = DummyFile()
+ self.discard_sink = DummyFile()
def _TearDown(self):
"""Performs operations to clean things up after performing diagnostics."""
@@ -349,20 +378,19 @@
except OSError:
pass
- cleanup_files = [self.thru_local_file, self.tcp_warmup_file]
- for f in cleanup_files:
+ if self.LAT in self.diag_tests or self.WTHRU in self.diag_tests:
+ cleanup_files = [self.thru_local_file, self.tcp_warmup_file]
+ for f in cleanup_files:
- def _Delete():
- k = self.bucket.key_class(self.bucket)
- k.name = os.path.basename(f)
- try:
- k.delete()
- except boto.exception.BotoServerError as e:
- # Ignore not found errors since it's already gone.
- if e.status != 404:
- raise
+ def _Delete():
+ try:
+ self.gsutil_api.DeleteObject(self.bucket_url.bucket_name,
+ os.path.basename(f),
+ provider=self.provider)
+ except NotFoundException:
+ pass
- self._RunOperation(_Delete)
+ self._RunOperation(_Delete)
@contextlib.contextmanager
def _Time(self, key, bucket):
@@ -399,27 +427,30 @@
# by the remote party or the connection broke because of network issues.
# Only the BotoServerError is counted as a 5xx error towards the retry
# limit.
- exceptions = list(self.bucket.connection.http_exceptions)
- exceptions.append(boto.exception.BotoServerError)
-
success = False
server_error_retried = 0
total_retried = 0
i = 0
+ return_val = None
while not success:
- next_sleep = random.random() * (2 ** i) + 1
+ next_sleep = min(random.random() * (2 ** i) + 1, GetMaxRetryDelay())
try:
- func()
+ return_val = func()
self.total_requests += 1
success = True
- except tuple(exceptions) as e:
+ except tuple(self.exceptions) as e:
total_retried += 1
if total_retried > self.MAX_TOTAL_RETRIES:
self.logger.info('Reached maximum total retries. Not retrying.')
break
- if isinstance(e, boto.exception.BotoServerError):
- if e.status >= 500:
- self.error_responses_by_code[e.status] += 1
+ if (isinstance(e, apiclient_errors.HttpError) or
+ isinstance(e, ServiceException)):
+ if isinstance(e, apiclient_errors.HttpError):
+ status = e.resp.status
+ else:
+ status = e.status
+ if status >= 500:
+ self.error_responses_by_code[status] += 1
self.total_requests += 1
self.request_errors += 1
server_error_retried += 1
@@ -432,7 +463,7 @@
break
else:
self.connection_breaks += 1
- return success
+ return return_val
def _RunLatencyTests(self):
"""Runs latency tests."""
@@ -443,50 +474,62 @@
self.logger.info('\nRunning latency iteration %d...', i+1)
for fpath in self.latency_files:
basename = os.path.basename(fpath)
- gsbucket = str(self.bucket_uri)
- gsuri = gsbucket + basename
+ url = self.bucket_url.Clone()
+ url.object_name = basename
file_size = self.file_sizes[fpath]
readable_file_size = MakeHumanReadable(file_size)
self.logger.info(
- "\nFile of size %(size)s located on disk at '%(fpath)s' being "
- "diagnosed in the cloud at '%(gsuri)s'."
- % {'size': readable_file_size, 'fpath': fpath, 'gsuri': gsuri})
+ "\nFile of size %s located on disk at '%s' being diagnosed in the "
+ "cloud at '%s'.", readable_file_size, fpath, url)
- k = self.bucket.key_class(self.bucket)
- k.BufferSize = self.KEY_BUFFER_SIZE
- k.key = basename
+ upload_target = StorageUrlToUploadObjectMetadata(url)
def _Upload():
+ io_fp = cStringIO.StringIO(self.file_contents[fpath])
with self._Time('UPLOAD_%d' % file_size, self.results['latency']):
- k.set_contents_from_string(self.file_contents[fpath],
- md5=self.file_md5s[fpath])
+ self.gsutil_api.UploadObject(
+ io_fp, upload_target, size=file_size, provider=self.provider,
+ fields=['name'])
self._RunOperation(_Upload)
def _Metadata():
with self._Time('METADATA_%d' % file_size, self.results['latency']):
- k.exists()
- self._RunOperation(_Metadata)
+ return self.gsutil_api.GetObjectMetadata(
+ url.bucket_name, url.object_name,
+ provider=self.provider, fields=['name', 'contentType',
+ 'mediaLink', 'size'])
+ # Download will get the metadata first if we don't pass it in.
+ download_metadata = self._RunOperation(_Metadata)
+ serialization_dict = GetDownloadSerializationDict(download_metadata)
+ serialization_data = json.dumps(serialization_dict)
def _Download():
with self._Time('DOWNLOAD_%d' % file_size, self.results['latency']):
- k.get_contents_to_file(self.devnull,
- **self.get_contents_to_file_args)
+ self.gsutil_api.GetObjectMedia(
+ url.bucket_name, url.object_name, self.discard_sink,
+ provider=self.provider, serialization_data=serialization_data)
self._RunOperation(_Download)
def _Delete():
with self._Time('DELETE_%d' % file_size, self.results['latency']):
- k.delete()
+ self.gsutil_api.DeleteObject(url.bucket_name, url.object_name,
+ provider=self.provider)
self._RunOperation(_Delete)
+ class _CpFilter(logging.Filter):
- class _CpFilter(logging.Filter):
def filter(self, record):
# Used to prevent cp._LogCopyOperation from spewing output from
# subprocesses about every iteration.
msg = record.getMessage()
- return not (('Copying file:///' in msg) or ('Copying gs://' in msg))
+ return not (('Copying file:///' in msg) or ('Copying gs://' in msg) or
+ ('Computing CRC' in msg))
+ def _PerfdiagExceptionHandler(self, e):
+ """Simple exception handler to allow post-completion status."""
+ self.logger.error(str(e))
+
def _RunReadThruTests(self):
"""Runs read throughput tests."""
self.results['read_throughput'] = {'file_size': self.thru_filesize,
@@ -495,49 +538,64 @@
'threads': self.threads}
# Copy the TCP warmup file.
- warmup_key = self.bucket.key_class(self.bucket)
- warmup_key.key = os.path.basename(self.tcp_warmup_file)
+ warmup_url = self.bucket_url.Clone()
+ warmup_url.object_name = os.path.basename(self.tcp_warmup_file)
+ warmup_target = StorageUrlToUploadObjectMetadata(warmup_url)
def _Upload1():
- warmup_key.set_contents_from_string(
- self.file_contents[self.tcp_warmup_file],
- md5=self.file_md5s[self.tcp_warmup_file])
+ self.gsutil_api.UploadObject(
+ cStringIO.StringIO(self.file_contents[self.tcp_warmup_file]),
+ warmup_target, provider=self.provider, fields=['name'])
self._RunOperation(_Upload1)
# Copy the file to remote location before reading.
- k = self.bucket.key_class(self.bucket)
- k.BufferSize = self.KEY_BUFFER_SIZE
- k.key = os.path.basename(self.thru_local_file)
+ thru_url = self.bucket_url.Clone()
+ thru_url.object_name = os.path.basename(self.thru_local_file)
+ thru_target = StorageUrlToUploadObjectMetadata(thru_url)
+ thru_target.md5Hash = self.file_md5s[self.thru_local_file]
+ # Get the mediaLink here so that we can pass it to download.
def _Upload2():
- k.set_contents_from_string(self.file_contents[self.thru_local_file],
- md5=self.file_md5s[self.thru_local_file])
- self._RunOperation(_Upload2)
+ return self.gsutil_api.UploadObject(
+ cStringIO.StringIO(self.file_contents[self.thru_local_file]),
+ thru_target, provider=self.provider, size=self.thru_filesize,
+ fields=['name', 'mediaLink', 'size'])
+ # Get the metadata for the object so that we are just measuring performance
+ # on the actual bytes transfer.
+ download_metadata = self._RunOperation(_Upload2)
+ serialization_dict = GetDownloadSerializationDict(download_metadata)
+ serialization_data = json.dumps(serialization_dict)
+
if self.processes == 1 and self.threads == 1:
# Warm up the TCP connection.
def _Warmup():
- warmup_key.get_contents_to_file(self.devnull,
- **self.get_contents_to_file_args)
+ self.gsutil_api.GetObjectMedia(warmup_url.bucket_name,
+ warmup_url.object_name,
+ self.discard_sink,
+ provider=self.provider)
self._RunOperation(_Warmup)
times = []
def _Download():
t0 = time.time()
- k.get_contents_to_file(self.devnull, **self.get_contents_to_file_args)
+ self.gsutil_api.GetObjectMedia(
+ thru_url.bucket_name, thru_url.object_name, self.discard_sink,
+ provider=self.provider, serialization_data=serialization_data)
t1 = time.time()
times.append(t1 - t0)
for _ in range(self.num_iterations):
self._RunOperation(_Download)
time_took = sum(times)
else:
- args = [k] * self.num_iterations
+ args = ([(thru_url.bucket_name, thru_url.object_name, serialization_data)]
+ * self.num_iterations)
self.logger.addFilter(self._CpFilter())
t0 = time.time()
- self.Apply(_DownloadKey,
+ self.Apply(_DownloadWrapper,
args,
_PerfdiagExceptionHandler,
arg_checker=DummyArgChecker,
@@ -561,36 +619,62 @@
'processes': self.processes,
'threads': self.threads}
- k = self.bucket.key_class(self.bucket)
- k.BufferSize = self.KEY_BUFFER_SIZE
- k.key = os.path.basename(self.thru_local_file)
+ warmup_url = self.bucket_url.Clone()
+ warmup_url.object_name = os.path.basename(self.tcp_warmup_file)
+ warmup_target = StorageUrlToUploadObjectMetadata(warmup_url)
+
+ thru_url = self.bucket_url.Clone()
+ thru_url.object_name = os.path.basename(self.thru_local_file)
+ thru_target = StorageUrlToUploadObjectMetadata(thru_url)
+ thru_tuples = []
+ for i in xrange(self.num_iterations):
+ # Create a unique name for each uploaded object. Otherwise,
+ # the XML API would fail when trying to non-atomically get metadata
+ # for the object that gets blown away by the overwrite.
+ thru_tuples.append(UploadObjectTuple(
+ thru_target.bucket, thru_target.name + str(i),
+ filepath=self.thru_local_file))
+
if self.processes == 1 and self.threads == 1:
# Warm up the TCP connection.
- warmup_key = self.bucket.key_class(self.bucket)
- warmup_key.key = os.path.basename(self.tcp_warmup_file)
-
def _Warmup():
- warmup_key.set_contents_from_string(
- self.file_contents[self.tcp_warmup_file],
- md5=self.file_md5s[self.tcp_warmup_file])
+ self.gsutil_api.UploadObject(
+ cStringIO.StringIO(self.file_contents[self.tcp_warmup_file]),
+ warmup_target, provider=self.provider, size=self.thru_filesize,
+ fields=['name'])
self._RunOperation(_Warmup)
times = []
- def _Upload():
- t0 = time.time()
- k.set_contents_from_string(self.file_contents[self.thru_local_file],
- md5=self.file_md5s[self.thru_local_file])
- t1 = time.time()
- times.append(t1 - t0)
- for _ in range(self.num_iterations):
+ for i in xrange(self.num_iterations):
+ thru_tuple = thru_tuples[i]
+ def _Upload():
+ """Uploads the write throughput measurement object."""
+ upload_target = apitools_messages.Object(
+ bucket=thru_tuple.bucket_name, name=thru_tuple.object_name,
+ md5Hash=thru_tuple.md5)
+ io_fp = cStringIO.StringIO(self.file_contents[self.thru_local_file])
+ t0 = time.time()
+ if self.thru_filesize < ResumableThreshold():
+ self.gsutil_api.UploadObject(
+ io_fp, upload_target, provider=self.provider,
+ size=self.thru_filesize, fields=['name'])
+ else:
+ self.gsutil_api.UploadObjectResumable(
+ io_fp, upload_target, provider=self.provider,
+ size=self.thru_filesize, fields=['name'],
+ tracker_callback=_DummyTrackerCallback)
+
+ t1 = time.time()
+ times.append(t1 - t0)
+
self._RunOperation(_Upload)
time_took = sum(times)
else:
- args = [k] * self.num_iterations
+ args = thru_tuples
t0 = time.time()
- self.Apply(_UploadKey,
+ self.Apply(_UploadWrapper,
args,
_PerfdiagExceptionHandler,
arg_checker=DummyArgChecker,
@@ -607,6 +691,136 @@
self.results['write_throughput']['total_bytes_copied'] = total_bytes_copied
self.results['write_throughput']['bytes_per_second'] = bytes_per_second
+ def _RunListTests(self):
+ """Runs eventual consistency listing latency tests."""
+ self.results['listing'] = {'num_files': self.num_iterations}
+
+ # Generate N random object names to put in the bucket.
+ list_prefix = 'gsutil-perfdiag-list-'
+ list_objects = []
+ for _ in xrange(self.num_iterations):
+ list_objects.append(
+ u'%s%s' % (list_prefix, os.urandom(20).encode('hex')))
+
+ # Add the objects to the bucket.
+ self.logger.info(
+ '\nWriting %s objects for listing test...', self.num_iterations)
+ empty_md5 = CalculateB64EncodedMd5FromContents(cStringIO.StringIO(''))
+ args = [
+ UploadObjectTuple(self.bucket_url.bucket_name, name, md5=empty_md5,
+ contents='') for name in list_objects]
+ self.Apply(_UploadWrapper, args, _PerfdiagExceptionHandler,
+ arg_checker=DummyArgChecker)
+
+ list_latencies = []
+ files_seen = []
+ total_start_time = time.time()
+ expected_objects = set(list_objects)
+ found_objects = set()
+
+ def _List():
+ """Lists and returns objects in the bucket. Also records latency."""
+ t0 = time.time()
+ objects = list(self.gsutil_api.ListObjects(
+ self.bucket_url.bucket_name, prefix=list_prefix, delimiter='/',
+ provider=self.provider, fields=['items/name']))
+ t1 = time.time()
+ list_latencies.append(t1 - t0)
+ return set([obj.data.name for obj in objects])
+
+ self.logger.info(
+ 'Listing bucket %s waiting for %s objects to appear...',
+ self.bucket_url.bucket_name, self.num_iterations)
+ while expected_objects - found_objects:
+ def _ListAfterUpload():
+ names = _List()
+ found_objects.update(names & expected_objects)
+ files_seen.append(len(found_objects))
+ self._RunOperation(_ListAfterUpload)
+ if expected_objects - found_objects:
+ if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME:
+ self.logger.warning('Maximum time reached waiting for listing.')
+ break
+ total_end_time = time.time()
+
+ self.results['listing']['insert'] = {
+ 'num_listing_calls': len(list_latencies),
+ 'list_latencies': list_latencies,
+ 'files_seen_after_listing': files_seen,
+ 'time_took': total_end_time - total_start_time,
+ }
+
+ self.logger.info(
+ 'Deleting %s objects for listing test...', self.num_iterations)
+ self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
+ arg_checker=DummyArgChecker)
+
+ self.logger.info(
+ 'Listing bucket %s waiting for %s objects to disappear...',
+ self.bucket_url.bucket_name, self.num_iterations)
+ list_latencies = []
+ files_seen = []
+ total_start_time = time.time()
+ found_objects = set(list_objects)
+ while found_objects:
+ def _ListAfterDelete():
+ names = _List()
+ found_objects.intersection_update(names)
+ files_seen.append(len(found_objects))
+ self._RunOperation(_ListAfterDelete)
+ if found_objects:
+ if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME:
+ self.logger.warning('Maximum time reached waiting for listing.')
+ break
+ total_end_time = time.time()
+
+ self.results['listing']['delete'] = {
+ 'num_listing_calls': len(list_latencies),
+ 'list_latencies': list_latencies,
+ 'files_seen_after_listing': files_seen,
+ 'time_took': total_end_time - total_start_time,
+ }
+
+ def Upload(self, thru_tuple, thread_state=None):
+ gsutil_api = GetCloudApiInstance(self, thread_state)
+
+ md5hash = thru_tuple.md5
+ contents = thru_tuple.contents
+ if thru_tuple.filepath:
+ md5hash = self.file_md5s[thru_tuple.filepath]
+ contents = self.file_contents[thru_tuple.filepath]
+
+ upload_target = apitools_messages.Object(
+ bucket=thru_tuple.bucket_name, name=thru_tuple.object_name,
+ md5Hash=md5hash)
+ file_size = len(contents)
+ if file_size < ResumableThreshold():
+ gsutil_api.UploadObject(
+ cStringIO.StringIO(contents), upload_target,
+ provider=self.provider, size=file_size, fields=['name'])
+ else:
+ gsutil_api.UploadObjectResumable(
+ cStringIO.StringIO(contents), upload_target,
+ provider=self.provider, size=file_size, fields=['name'],
+ tracker_callback=_DummyTrackerCallback)
+
+ def Download(self, download_tuple, thread_state=None):
+ """Downloads a file.
+
+ Args:
+ download_tuple: (bucket name, object name, serialization data for object).
+ thread_state: gsutil Cloud API instance to use for the download.
+ """
+ gsutil_api = GetCloudApiInstance(self, thread_state)
+ gsutil_api.GetObjectMedia(
+ download_tuple[0], download_tuple[1], self.discard_sink,
+ provider=self.provider, serialization_data=download_tuple[2])
+
+ def Delete(self, thru_tuple, thread_state=None):
+ gsutil_api = thread_state or self.gsutil_api
+ gsutil_api.DeleteObject(
+ thru_tuple.bucket_name, thru_tuple.object_name, provider=self.provider)
+
def _GetDiskCounters(self):
"""Retrieves disk I/O statistics for all disks.
@@ -702,12 +916,22 @@
"""Collects system information."""
sysinfo = {}
- # All exceptions that might be thrown from socket module calls.
+ # All exceptions that might be raised from socket module calls.
socket_errors = (
socket.error, socket.herror, socket.gaierror, socket.timeout)
# Find out whether HTTPS is enabled in Boto.
sysinfo['boto_https_enabled'] = boto.config.get('Boto', 'is_secure', True)
+
+ # Look up proxy info.
+ proxy_host = boto.config.get('Boto', 'proxy', None)
+ proxy_port = boto.config.getint('Boto', 'proxy_port', 0)
+ sysinfo['using_proxy'] = bool(proxy_host)
+
+ if boto.config.get('Boto', 'proxy_rdns', False):
+ self.logger.info('DNS lookups are disallowed in this environment, so '
+ 'some information is not included in this perfdiag run.')
+
# Get the local IP address from socket lib.
try:
sysinfo['ip_address'] = socket.gethostbyname(socket.gethostname())
@@ -723,27 +947,38 @@
# Execute a CNAME lookup on Google DNS to find what Google server
# it's routing to.
- cmd = ['nslookup', '-type=CNAME', self.GOOGLE_API_HOST]
+ cmd = ['nslookup', '-type=CNAME', self.XML_API_HOST]
try:
nslookup_cname_output = self._Exec(cmd, return_output=True)
m = re.search(r' = (?P<googserv>[^.]+)\.', nslookup_cname_output)
sysinfo['googserv_route'] = m.group('googserv') if m else None
- except OSError:
+ except (CommandException, OSError):
sysinfo['googserv_route'] = ''
+ # Try to determine the latency of a DNS lookup for the Google hostname
+ # endpoint. Note: we don't piggyback on gethostbyname_ex below because
+ # the _ex version requires an extra RTT.
+ try:
+ t0 = time.time()
+ socket.gethostbyname(self.XML_API_HOST)
+ t1 = time.time()
+ sysinfo['google_host_dns_latency'] = t1 - t0
+ except socket_errors:
+ pass
+
# Look up IP addresses for Google Server.
try:
- (hostname, aliaslist, ipaddrlist) = socket.gethostbyname_ex(
- self.GOOGLE_API_HOST)
+ (hostname, _, ipaddrlist) = socket.gethostbyname_ex(self.XML_API_HOST)
sysinfo['googserv_ips'] = ipaddrlist
except socket_errors:
+ ipaddrlist = []
sysinfo['googserv_ips'] = []
# Reverse lookup the hostnames for the Google Server IPs.
sysinfo['googserv_hostnames'] = []
for googserv_ip in ipaddrlist:
try:
- (hostname, aliaslist, ipaddrlist) = socket.gethostbyaddr(googserv_ip)
+ (hostname, _, ipaddrlist) = socket.gethostbyaddr(googserv_ip)
sysinfo['googserv_hostnames'].append(hostname)
except socket_errors:
pass
@@ -754,9 +989,43 @@
nslookup_txt_output = self._Exec(cmd, return_output=True)
m = re.search(r'text\s+=\s+"(?P<dnsip>[\.\d]+)"', nslookup_txt_output)
sysinfo['dns_o-o_ip'] = m.group('dnsip') if m else None
- except OSError:
+ except (CommandException, OSError):
sysinfo['dns_o-o_ip'] = ''
+ # Try to determine the latency of connecting to the Google hostname
+ # endpoint.
+ sysinfo['google_host_connect_latencies'] = {}
+ for googserv_ip in ipaddrlist:
+ try:
+ sock = socket.socket()
+ t0 = time.time()
+ sock.connect((googserv_ip, self.XML_API_PORT))
+ t1 = time.time()
+ sysinfo['google_host_connect_latencies'][googserv_ip] = t1 - t0
+ except socket_errors:
+ pass
+
+ # If using a proxy, try to determine the latency of a DNS lookup to resolve
+ # the proxy hostname and the latency of connecting to the proxy.
+ if proxy_host:
+ proxy_ip = None
+ try:
+ t0 = time.time()
+ proxy_ip = socket.gethostbyname(proxy_host)
+ t1 = time.time()
+ sysinfo['proxy_dns_latency'] = t1 - t0
+ except socket_errors:
+ pass
+
+ try:
+ sock = socket.socket()
+ t0 = time.time()
+ sock.connect((proxy_ip or proxy_host, proxy_port))
+ t1 = time.time()
+ sysinfo['proxy_host_connect_latency'] = t1 - t0
+ except socket_errors:
+ pass
+
# Try and find the number of CPUs in the system if available.
try:
sysinfo['cpu_count'] = multiprocessing.cpu_count()
@@ -902,6 +1171,33 @@
print 'Read throughput: %s/s.' % (
MakeBitsHumanReadable(read_thru['bytes_per_second'] * 8))
+ if 'listing' in self.results:
+ print
+ print '-' * 78
+ print 'Listing'.center(78)
+ print '-' * 78
+
+ listing = self.results['listing']
+ insert = listing['insert']
+ delete = listing['delete']
+ print 'After inserting %s objects:' % listing['num_files']
+ print (' Total time for objects to appear: %.2g seconds' %
+ insert['time_took'])
+ print ' Number of listing calls made: %s' % insert['num_listing_calls']
+ print (' Individual listing call latencies: [%s]' %
+ ', '.join('%.2gs' % lat for lat in insert['list_latencies']))
+ print (' Files reflected after each call: [%s]' %
+ ', '.join(map(str, insert['files_seen_after_listing'])))
+
+ print 'After deleting %s objects:' % listing['num_files']
+ print (' Total time for objects to appear: %.2g seconds' %
+ delete['time_took'])
+ print ' Number of listing calls made: %s' % delete['num_listing_calls']
+ print (' Individual listing call latencies: [%s]' %
+ ', '.join('%.2gs' % lat for lat in delete['list_latencies']))
+ print (' Files reflected after each call: [%s]' %
+ ', '.join(map(str, delete['files_seen_after_listing'])))
+
if 'sysinfo' in self.results:
print
print '-' * 78
@@ -989,6 +1285,26 @@
if 'boto_https_enabled' in info:
print 'Boto HTTPS Enabled: \n %s' % info['boto_https_enabled']
+ if 'using_proxy' in info:
+ print 'Requests routed through proxy: \n %s' % info['using_proxy']
+
+ if 'google_host_dns_latency' in info:
+ print ('Latency of the DNS lookup for Google Storage server (ms): '
+ '\n %.1f' % (info['google_host_dns_latency'] * 1000.0))
+
+ if 'google_host_connect_latencies' in info:
+ print 'Latencies connecting to Google Storage server IPs (ms):'
+ for ip, latency in info['google_host_connect_latencies'].iteritems():
+ print ' %s = %.1f' % (ip, latency * 1000.0)
+
+ if 'proxy_dns_latency' in info:
+ print ('Latency of the DNS lookup for the configured proxy (ms): '
+ '\n %.1f' % (info['proxy_dns_latency'] * 1000.0))
+
+ if 'proxy_host_connect_latency' in info:
+ print ('Latency connecting to the configured proxy (ms): \n %.1f' %
+ (info['proxy_host_connect_latency'] * 1000.0))
+
if 'request_errors' in self.results and 'total_requests' in self.results:
print
print '-' * 78
@@ -1050,7 +1366,7 @@
# From -s.
self.thru_filesize = 1048576
# From -t.
- self.diag_tests = self.ALL_DIAG_TESTS
+ self.diag_tests = self.DEFAULT_DIAG_TESTS
# From -o.
self.output_file = None
# From -i.
@@ -1109,20 +1425,20 @@
if not self.args:
raise CommandException('Wrong number of arguments for "perfdiag" '
'command.')
- self.bucket_uri = self.suri_builder.StorageUri(self.args[0])
- if not self.bucket_uri.names_bucket():
- raise CommandException('The perfdiag command requires a URI that '
+
+ self.bucket_url = StorageUrlFromString(self.args[0])
+ self.provider = self.bucket_url.scheme
+ if not (self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket()):
+ raise CommandException('The perfdiag command requires a URL that '
'specifies a bucket.\n"%s" is not '
- 'valid.' % self.bucket_uri)
- self.bucket = self.bucket_uri.get_bucket()
+ 'valid.' % self.args[0])
+ # Ensure the bucket exists.
+ self.gsutil_api.GetBucket(self.bucket_url.bucket_name,
+ provider=self.bucket_url.scheme,
+ fields=['id'])
+ self.exceptions = [httplib.HTTPException, socket.error, socket.gaierror,
+ httplib.BadStatusLine, ServiceException]
- # TODO: Add MD5 argument support to get_contents_to_file()
- # and pass the file md5 as a parameter to avoid any unnecessary
- # computation.
- self.get_contents_to_file_args = {}
- if self.bucket_uri.scheme == 'gs':
- self.get_contents_to_file_args = {'hash_algs': {}}
-
# Command entry point.
def RunCommand(self):
"""Called by gsutil when the command is being invoked."""
@@ -1144,7 +1460,7 @@
'Throughput file size: %s\n'
'Diagnostics to run: %s',
self.num_iterations,
- self.bucket_uri,
+ self.bucket_url,
self.processes,
self.threads,
MakeHumanReadable(self.thru_filesize),
@@ -1159,17 +1475,19 @@
self.results['sysinfo']['netstat_start'] = self._GetTcpStats()
if IS_LINUX:
self.results['sysinfo']['disk_counters_start'] = self._GetDiskCounters()
- # Record bucket URI.
- self.results['bucket_uri'] = str(self.bucket_uri)
+ # Record bucket URL.
+ self.results['bucket_uri'] = str(self.bucket_url)
self.results['json_format'] = 'perfdiag'
self.results['metadata'] = self.metadata_keys
- if 'lat' in self.diag_tests:
+ if self.LAT in self.diag_tests:
self._RunLatencyTests()
- if 'rthru' in self.diag_tests:
+ if self.RTHRU in self.diag_tests:
self._RunReadThruTests()
- if 'wthru' in self.diag_tests:
+ if self.WTHRU in self.diag_tests:
self._RunWriteThruTests()
+ if self.LIST in self.diag_tests:
+ self._RunListTests()
# Collect netstat info and disk counters after tests.
self.results['sysinfo']['netstat_end'] = self._GetTcpStats()
@@ -1188,3 +1506,48 @@
self._TearDown()
return 0
+
+
+class UploadObjectTuple(object):
+ """Picklable tuple with necessary metadata for an insert object call."""
+
+ def __init__(self, bucket_name, object_name, filepath=None, md5=None,
+ contents=None):
+ """Create an upload tuple.
+
+ Args:
+ bucket_name: Name of the bucket to upload to.
+ object_name: Name of the object to upload to.
+ filepath: A file path located in self.file_contents and self.file_md5s.
+ md5: The MD5 hash of the object being uploaded.
+ contents: The contents of the file to be uploaded.
+
+ Note: (contents + md5) and filepath are mutually exlusive. You may specify
+ one or the other, but not both.
+ Note: If one of contents or md5 are specified, they must both be specified.
+
+ Raises:
+ InvalidArgument: if the arguments are invalid.
+ """
+ self.bucket_name = bucket_name
+ self.object_name = object_name
+ self.filepath = filepath
+ self.md5 = md5
+ self.contents = contents
+ if filepath and (md5 or contents is not None):
+ raise InvalidArgument(
+ 'Only one of filepath or (md5 + contents) may be specified.')
+ if not filepath and (not md5 or contents is None):
+ raise InvalidArgument(
+ 'Both md5 and contents must be specified.')
+
+
+def StorageUrlToUploadObjectMetadata(storage_url):
+ if storage_url.IsCloudUrl() and storage_url.IsObject():
+ upload_target = apitools_messages.Object()
+ upload_target.name = storage_url.object_name
+ upload_target.bucket = storage_url.bucket_name
+ return upload_target
+ else:
+ raise CommandException('Non-cloud URL upload target %s was created in '
+ 'perfdiag implemenation.' % storage_url)
« no previous file with comments | « gslib/commands/notification.py ('k') | gslib/commands/rb.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698