| Index: gslib/commands/perfdiag.py
|
| ===================================================================
|
| --- gslib/commands/perfdiag.py (revision 33376)
|
| +++ gslib/commands/perfdiag.py (working copy)
|
| @@ -1,3 +1,4 @@
|
| +# -*- coding: utf-8 -*-
|
| # Copyright 2012 Google Inc. All Rights Reserved.
|
| #
|
| # Licensed under the Apache License, Version 2.0 (the "License");
|
| @@ -11,16 +12,16 @@
|
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| # See the License for the specific language governing permissions and
|
| # limitations under the License.
|
| -
|
| """Contains the perfdiag gsutil command."""
|
|
|
| -# Get the system logging module, not our local logging module.
|
| from __future__ import absolute_import
|
|
|
| import calendar
|
| from collections import defaultdict
|
| import contextlib
|
| +import cStringIO
|
| import datetime
|
| +import httplib
|
| import json
|
| import logging
|
| import math
|
| @@ -34,48 +35,42 @@
|
| import tempfile
|
| import time
|
|
|
| +from apiclient import errors as apiclient_errors
|
| import boto
|
| -from boto.utils import compute_md5
|
| import boto.gs.connection
|
|
|
| import gslib
|
| +from gslib.cloud_api import NotFoundException
|
| +from gslib.cloud_api import ServiceException
|
| +from gslib.cloud_api_helper import GetDownloadSerializationDict
|
| from gslib.command import Command
|
| -from gslib.command import COMMAND_NAME
|
| -from gslib.command import COMMAND_NAME_ALIASES
|
| from gslib.command import DummyArgChecker
|
| -from gslib.command import FILE_URIS_OK
|
| -from gslib.command import MAX_ARGS
|
| -from gslib.command import MIN_ARGS
|
| -from gslib.command import PROVIDER_URIS_OK
|
| -from gslib.command import SUPPORTED_SUB_ARGS
|
| -from gslib.command import URIS_START_ARG
|
| -#from gslib.command_runner import CommandRunner
|
| from gslib.commands import config
|
| +from gslib.cs_api_map import ApiSelector
|
| from gslib.exception import CommandException
|
| -from gslib.help_provider import HELP_NAME
|
| -from gslib.help_provider import HELP_NAME_ALIASES
|
| -from gslib.help_provider import HELP_ONE_LINE_SUMMARY
|
| -from gslib.help_provider import HELP_TEXT
|
| -from gslib.help_provider import HELP_TYPE
|
| -from gslib.help_provider import HelpType
|
| -from gslib.util import GetBotoConfigFileList
|
| +from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
|
| +from gslib.storage_url import StorageUrlFromString
|
| +from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
|
| +from gslib.util import GetCloudApiInstance
|
| +from gslib.util import GetMaxRetryDelay
|
| from gslib.util import HumanReadableToBytes
|
| from gslib.util import IS_LINUX
|
| from gslib.util import MakeBitsHumanReadable
|
| from gslib.util import MakeHumanReadable
|
| from gslib.util import Percentile
|
| +from gslib.util import ResumableThreshold
|
|
|
| -_detailed_help_text = ("""
|
| +_DETAILED_HELP_TEXT = ("""
|
| <B>SYNOPSIS</B>
|
| gsutil perfdiag [-i in.json] [-o out.json] [-n iterations] [-c processes]
|
| - [-k threads] [-s size] [-t tests] uri...
|
| + [-k threads] [-s size] [-t tests] url...
|
|
|
|
|
| <B>DESCRIPTION</B>
|
| The perfdiag command runs a suite of diagnostic tests for a given Google
|
| Storage bucket.
|
|
|
| - The 'uri' parameter must name an existing bucket (e.g. gs://foo) to which
|
| + The 'url' parameter must name an existing bucket (e.g. gs://foo) to which
|
| the user has write permission. Several test files will be uploaded to and
|
| downloaded from this bucket. All test files will be deleted at the completion
|
| of the diagnostic if it finishes successfully.
|
| @@ -122,6 +117,13 @@
|
| retrieving its metadata, reading the file, and deleting
|
| the file. Records the latency of each operation.
|
|
|
| + list
|
| + Write N (set with -n) objects to the bucket, record how long
|
| + it takes for the eventually consistent listing call to return
|
| + the N objects in its result, delete the N objects, then record
|
| + how long it takes listing to stop returning the N objects.
|
| + This test is off by default.
|
| +
|
| rthru
|
| Runs N (set with -n) read operations, with at most C
|
| (set with -c) reads outstanding at any given time.
|
| @@ -163,65 +165,77 @@
|
| <B>NOTE</B>
|
| The perfdiag command collects system information. It collects your IP address,
|
| executes DNS queries to Google servers and collects the results, and collects
|
| - network statistics information from the output of netstat -s. None of this
|
| - information will be sent to Google unless you choose to send it.
|
| + network statistics information from the output of netstat -s. It will also
|
| + attempt to connect to your proxy server if you have one configured. None of
|
| + this information will be sent to Google unless you choose to send it.
|
| """)
|
|
|
| -def _DownloadKey(cls, key):
|
| - key.get_contents_to_file(cls.devnull, **cls.get_contents_to_file_args)
|
| -
|
| -def _UploadKey(cls, key):
|
| - return key.set_contents_from_string(cls.file_contents[cls.thru_local_file],
|
| - md5=cls.file_md5s[cls.thru_local_file])
|
| -
|
| +
|
| +class Error(Exception):
|
| + """Base exception class for this module."""
|
| + pass
|
| +
|
| +
|
| +class InvalidArgument(Error):
|
| + """Raised on invalid arguments to functions."""
|
| + pass
|
| +
|
| +
|
| +def _DownloadWrapper(cls, arg, thread_state=None):
|
| + cls.Download(arg, thread_state=thread_state)
|
| +
|
| +
|
| +def _UploadWrapper(cls, arg, thread_state=None):
|
| + cls.Upload(arg, thread_state=thread_state)
|
| +
|
| +
|
| +def _DeleteWrapper(cls, arg, thread_state=None):
|
| + cls.Delete(arg, thread_state=thread_state)
|
| +
|
| +
|
| def _PerfdiagExceptionHandler(cls, e):
|
| """Simple exception handler to allow post-completion status."""
|
| cls.logger.error(str(e))
|
| -
|
|
|
| +
|
| +def _DummyTrackerCallback(_):
|
| + pass
|
| +
|
| +
|
| class DummyFile(object):
|
| """A dummy, file-like object that throws away everything written to it."""
|
|
|
| - def write(self, *args, **kwargs): # pylint: disable-msg=C6409
|
| + def write(self, *args, **kwargs): # pylint: disable=invalid-name
|
| pass
|
|
|
|
|
| class PerfDiagCommand(Command):
|
| """Implementation of gsutil perfdiag command."""
|
|
|
| - # Command specification (processed by parent class).
|
| - command_spec = {
|
| - # Name of command.
|
| - COMMAND_NAME: 'perfdiag',
|
| - # List of command name aliases.
|
| - COMMAND_NAME_ALIASES: ['diag', 'diagnostic', 'perf', 'performance'],
|
| - # Min number of args required by this command.
|
| - MIN_ARGS: 0,
|
| - # Max number of args required by this command, or NO_MAX.
|
| - MAX_ARGS: 1,
|
| - # Getopt-style string specifying acceptable sub args.
|
| - SUPPORTED_SUB_ARGS: 'n:c:k:s:t:m:i:o:',
|
| - # True if file URIs acceptable for this command.
|
| - FILE_URIS_OK: False,
|
| - # True if provider-only URIs acceptable for this command.
|
| - PROVIDER_URIS_OK: False,
|
| - # Index in args of first URI arg.
|
| - URIS_START_ARG: 0,
|
| - }
|
| - help_spec = {
|
| - # Name of command or auxiliary help info for which this help applies.
|
| - HELP_NAME: 'perfdiag',
|
| - # List of help name aliases.
|
| - HELP_NAME_ALIASES: [],
|
| - # Type of help:
|
| - HELP_TYPE: HelpType.COMMAND_HELP,
|
| - # One line summary of this help.
|
| - HELP_ONE_LINE_SUMMARY: 'Run performance diagnostic',
|
| - # The full help text.
|
| - HELP_TEXT: _detailed_help_text,
|
| - }
|
| + # Command specification. See base class for documentation.
|
| + command_spec = Command.CreateCommandSpec(
|
| + 'perfdiag',
|
| + command_name_aliases=['diag', 'diagnostic', 'perf', 'performance'],
|
| + min_args=0,
|
| + max_args=1,
|
| + supported_sub_args='n:c:k:s:t:m:i:o:',
|
| + file_url_ok=False,
|
| + provider_url_ok=False,
|
| + urls_start_arg=0,
|
| + gs_api_support=[ApiSelector.XML, ApiSelector.JSON],
|
| + gs_default_api=ApiSelector.JSON,
|
| + )
|
| + # Help specification. See help_provider.py for documentation.
|
| + help_spec = Command.HelpSpec(
|
| + help_name='perfdiag',
|
| + help_name_aliases=[],
|
| + help_type='command_help',
|
| + help_one_line_summary='Run performance diagnostic',
|
| + help_text=_DETAILED_HELP_TEXT,
|
| + subcommand_help_text={},
|
| + )
|
|
|
| - # Byte sizes to use for testing files.
|
| + # Byte sizes to use for latency testing files.
|
| # TODO: Consider letting the user specify these sizes with a configuration
|
| # parameter.
|
| test_file_sizes = (
|
| @@ -231,11 +245,22 @@
|
| 1048576, # 1MB
|
| )
|
|
|
| + # Test names.
|
| + RTHRU = 'rthru'
|
| + WTHRU = 'wthru'
|
| + LAT = 'lat'
|
| + LIST = 'list'
|
| +
|
| # List of all diagnostic tests.
|
| - ALL_DIAG_TESTS = ('rthru', 'wthru', 'lat')
|
| + ALL_DIAG_TESTS = (RTHRU, WTHRU, LAT, LIST)
|
| + # List of diagnostic tests to run by default.
|
| + DEFAULT_DIAG_TESTS = (RTHRU, WTHRU, LAT)
|
|
|
| - # Google Cloud Storage API endpoint host.
|
| - GOOGLE_API_HOST = boto.gs.connection.GSConnection.DefaultHost
|
| + # Google Cloud Storage XML API endpoint host.
|
| + XML_API_HOST = boto.config.get(
|
| + 'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost)
|
| + # Google Cloud Storage XML API endpoint port.
|
| + XML_API_PORT = boto.config.get('Credentials', 'gs_port', 80)
|
|
|
| # Maximum number of times to retry requests on 5xx errors.
|
| MAX_SERVER_ERROR_RETRIES = 5
|
| @@ -251,6 +276,10 @@
|
| # to repeat bytes. This number was chosen as the next prime larger than 5 MB.
|
| MAX_UNIQUE_RANDOM_BYTES = 5242883
|
|
|
| + # Maximum amount of time, in seconds, we will wait for object listings to
|
| + # reflect what we expect in the listing tests.
|
| + MAX_LISTING_WAIT_TIME = 60.0
|
| +
|
| def _Exec(self, cmd, raise_on_error=True, return_output=False,
|
| mute_stderr=False):
|
| """Executes a command in a subprocess.
|
| @@ -274,7 +303,7 @@
|
| self.logger.debug('Running command: %s', cmd)
|
| stderr = subprocess.PIPE if mute_stderr else None
|
| p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=stderr)
|
| - (stdoutdata, stderrdata) = p.communicate()
|
| + (stdoutdata, _) = p.communicate()
|
| if raise_on_error and p.returncode:
|
| raise CommandException("Received non-zero return code (%d) from "
|
| "subprocess '%s'." % (p.returncode, ' '.join(cmd)))
|
| @@ -309,7 +338,7 @@
|
| self.file_sizes[fpath] = file_size
|
| random_bytes = os.urandom(min(file_size, self.MAX_UNIQUE_RANDOM_BYTES))
|
| total_bytes = 0
|
| - file_contents = ""
|
| + file_contents = ''
|
| while total_bytes < file_size:
|
| num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES, file_size - total_bytes)
|
| file_contents += random_bytes[:num_bytes]
|
| @@ -318,7 +347,7 @@
|
| with os.fdopen(fd, 'wb') as f:
|
| f.write(self.file_contents[fpath])
|
| with open(fpath, 'rb') as f:
|
| - self.file_md5s[fpath] = compute_md5(f)
|
| + self.file_md5s[fpath] = CalculateB64EncodedMd5FromContents(f)
|
| return fpath
|
|
|
| # Create files for latency tests.
|
| @@ -329,16 +358,16 @@
|
| # Creating a file for warming up the TCP connection.
|
| self.tcp_warmup_file = _MakeFile(5 * 1024 * 1024) # 5 Megabytes.
|
| # Remote file to use for TCP warmup.
|
| - self.tcp_warmup_remote_file = (str(self.bucket_uri) +
|
| + self.tcp_warmup_remote_file = (str(self.bucket_url) +
|
| os.path.basename(self.tcp_warmup_file))
|
|
|
| # Local file on disk for write throughput tests.
|
| self.thru_local_file = _MakeFile(self.thru_filesize)
|
| # Remote file to write/read from during throughput tests.
|
| - self.thru_remote_file = (str(self.bucket_uri) +
|
| + self.thru_remote_file = (str(self.bucket_url) +
|
| os.path.basename(self.thru_local_file))
|
| # Dummy file buffer to use for downloading that goes nowhere.
|
| - self.devnull = DummyFile()
|
| + self.discard_sink = DummyFile()
|
|
|
| def _TearDown(self):
|
| """Performs operations to clean things up after performing diagnostics."""
|
| @@ -349,20 +378,19 @@
|
| except OSError:
|
| pass
|
|
|
| - cleanup_files = [self.thru_local_file, self.tcp_warmup_file]
|
| - for f in cleanup_files:
|
| + if self.LAT in self.diag_tests or self.WTHRU in self.diag_tests:
|
| + cleanup_files = [self.thru_local_file, self.tcp_warmup_file]
|
| + for f in cleanup_files:
|
|
|
| - def _Delete():
|
| - k = self.bucket.key_class(self.bucket)
|
| - k.name = os.path.basename(f)
|
| - try:
|
| - k.delete()
|
| - except boto.exception.BotoServerError as e:
|
| - # Ignore not found errors since it's already gone.
|
| - if e.status != 404:
|
| - raise
|
| + def _Delete():
|
| + try:
|
| + self.gsutil_api.DeleteObject(self.bucket_url.bucket_name,
|
| + os.path.basename(f),
|
| + provider=self.provider)
|
| + except NotFoundException:
|
| + pass
|
|
|
| - self._RunOperation(_Delete)
|
| + self._RunOperation(_Delete)
|
|
|
| @contextlib.contextmanager
|
| def _Time(self, key, bucket):
|
| @@ -399,27 +427,30 @@
|
| # by the remote party or the connection broke because of network issues.
|
| # Only the BotoServerError is counted as a 5xx error towards the retry
|
| # limit.
|
| - exceptions = list(self.bucket.connection.http_exceptions)
|
| - exceptions.append(boto.exception.BotoServerError)
|
| -
|
| success = False
|
| server_error_retried = 0
|
| total_retried = 0
|
| i = 0
|
| + return_val = None
|
| while not success:
|
| - next_sleep = random.random() * (2 ** i) + 1
|
| + next_sleep = min(random.random() * (2 ** i) + 1, GetMaxRetryDelay())
|
| try:
|
| - func()
|
| + return_val = func()
|
| self.total_requests += 1
|
| success = True
|
| - except tuple(exceptions) as e:
|
| + except tuple(self.exceptions) as e:
|
| total_retried += 1
|
| if total_retried > self.MAX_TOTAL_RETRIES:
|
| self.logger.info('Reached maximum total retries. Not retrying.')
|
| break
|
| - if isinstance(e, boto.exception.BotoServerError):
|
| - if e.status >= 500:
|
| - self.error_responses_by_code[e.status] += 1
|
| + if (isinstance(e, apiclient_errors.HttpError) or
|
| + isinstance(e, ServiceException)):
|
| + if isinstance(e, apiclient_errors.HttpError):
|
| + status = e.resp.status
|
| + else:
|
| + status = e.status
|
| + if status >= 500:
|
| + self.error_responses_by_code[status] += 1
|
| self.total_requests += 1
|
| self.request_errors += 1
|
| server_error_retried += 1
|
| @@ -432,7 +463,7 @@
|
| break
|
| else:
|
| self.connection_breaks += 1
|
| - return success
|
| + return return_val
|
|
|
| def _RunLatencyTests(self):
|
| """Runs latency tests."""
|
| @@ -443,50 +474,62 @@
|
| self.logger.info('\nRunning latency iteration %d...', i+1)
|
| for fpath in self.latency_files:
|
| basename = os.path.basename(fpath)
|
| - gsbucket = str(self.bucket_uri)
|
| - gsuri = gsbucket + basename
|
| + url = self.bucket_url.Clone()
|
| + url.object_name = basename
|
| file_size = self.file_sizes[fpath]
|
| readable_file_size = MakeHumanReadable(file_size)
|
|
|
| self.logger.info(
|
| - "\nFile of size %(size)s located on disk at '%(fpath)s' being "
|
| - "diagnosed in the cloud at '%(gsuri)s'."
|
| - % {'size': readable_file_size, 'fpath': fpath, 'gsuri': gsuri})
|
| + "\nFile of size %s located on disk at '%s' being diagnosed in the "
|
| + "cloud at '%s'.", readable_file_size, fpath, url)
|
|
|
| - k = self.bucket.key_class(self.bucket)
|
| - k.BufferSize = self.KEY_BUFFER_SIZE
|
| - k.key = basename
|
| + upload_target = StorageUrlToUploadObjectMetadata(url)
|
|
|
| def _Upload():
|
| + io_fp = cStringIO.StringIO(self.file_contents[fpath])
|
| with self._Time('UPLOAD_%d' % file_size, self.results['latency']):
|
| - k.set_contents_from_string(self.file_contents[fpath],
|
| - md5=self.file_md5s[fpath])
|
| + self.gsutil_api.UploadObject(
|
| + io_fp, upload_target, size=file_size, provider=self.provider,
|
| + fields=['name'])
|
| self._RunOperation(_Upload)
|
|
|
| def _Metadata():
|
| with self._Time('METADATA_%d' % file_size, self.results['latency']):
|
| - k.exists()
|
| - self._RunOperation(_Metadata)
|
| + return self.gsutil_api.GetObjectMetadata(
|
| + url.bucket_name, url.object_name,
|
| + provider=self.provider, fields=['name', 'contentType',
|
| + 'mediaLink', 'size'])
|
| + # Download will get the metadata first if we don't pass it in.
|
| + download_metadata = self._RunOperation(_Metadata)
|
| + serialization_dict = GetDownloadSerializationDict(download_metadata)
|
| + serialization_data = json.dumps(serialization_dict)
|
|
|
| def _Download():
|
| with self._Time('DOWNLOAD_%d' % file_size, self.results['latency']):
|
| - k.get_contents_to_file(self.devnull,
|
| - **self.get_contents_to_file_args)
|
| + self.gsutil_api.GetObjectMedia(
|
| + url.bucket_name, url.object_name, self.discard_sink,
|
| + provider=self.provider, serialization_data=serialization_data)
|
| self._RunOperation(_Download)
|
|
|
| def _Delete():
|
| with self._Time('DELETE_%d' % file_size, self.results['latency']):
|
| - k.delete()
|
| + self.gsutil_api.DeleteObject(url.bucket_name, url.object_name,
|
| + provider=self.provider)
|
| self._RunOperation(_Delete)
|
|
|
| + class _CpFilter(logging.Filter):
|
|
|
| - class _CpFilter(logging.Filter):
|
| def filter(self, record):
|
| # Used to prevent cp._LogCopyOperation from spewing output from
|
| # subprocesses about every iteration.
|
| msg = record.getMessage()
|
| - return not (('Copying file:///' in msg) or ('Copying gs://' in msg))
|
| + return not (('Copying file:///' in msg) or ('Copying gs://' in msg) or
|
| + ('Computing CRC' in msg))
|
|
|
| + def _PerfdiagExceptionHandler(self, e):
|
| + """Simple exception handler to allow post-completion status."""
|
| + self.logger.error(str(e))
|
| +
|
| def _RunReadThruTests(self):
|
| """Runs read throughput tests."""
|
| self.results['read_throughput'] = {'file_size': self.thru_filesize,
|
| @@ -495,49 +538,64 @@
|
| 'threads': self.threads}
|
|
|
| # Copy the TCP warmup file.
|
| - warmup_key = self.bucket.key_class(self.bucket)
|
| - warmup_key.key = os.path.basename(self.tcp_warmup_file)
|
| + warmup_url = self.bucket_url.Clone()
|
| + warmup_url.object_name = os.path.basename(self.tcp_warmup_file)
|
| + warmup_target = StorageUrlToUploadObjectMetadata(warmup_url)
|
|
|
| def _Upload1():
|
| - warmup_key.set_contents_from_string(
|
| - self.file_contents[self.tcp_warmup_file],
|
| - md5=self.file_md5s[self.tcp_warmup_file])
|
| + self.gsutil_api.UploadObject(
|
| + cStringIO.StringIO(self.file_contents[self.tcp_warmup_file]),
|
| + warmup_target, provider=self.provider, fields=['name'])
|
| self._RunOperation(_Upload1)
|
|
|
| # Copy the file to remote location before reading.
|
| - k = self.bucket.key_class(self.bucket)
|
| - k.BufferSize = self.KEY_BUFFER_SIZE
|
| - k.key = os.path.basename(self.thru_local_file)
|
| + thru_url = self.bucket_url.Clone()
|
| + thru_url.object_name = os.path.basename(self.thru_local_file)
|
| + thru_target = StorageUrlToUploadObjectMetadata(thru_url)
|
| + thru_target.md5Hash = self.file_md5s[self.thru_local_file]
|
|
|
| + # Get the mediaLink here so that we can pass it to download.
|
| def _Upload2():
|
| - k.set_contents_from_string(self.file_contents[self.thru_local_file],
|
| - md5=self.file_md5s[self.thru_local_file])
|
| - self._RunOperation(_Upload2)
|
| + return self.gsutil_api.UploadObject(
|
| + cStringIO.StringIO(self.file_contents[self.thru_local_file]),
|
| + thru_target, provider=self.provider, size=self.thru_filesize,
|
| + fields=['name', 'mediaLink', 'size'])
|
|
|
| + # Get the metadata for the object so that we are just measuring performance
|
| + # on the actual bytes transfer.
|
| + download_metadata = self._RunOperation(_Upload2)
|
| + serialization_dict = GetDownloadSerializationDict(download_metadata)
|
| + serialization_data = json.dumps(serialization_dict)
|
| +
|
| if self.processes == 1 and self.threads == 1:
|
|
|
| # Warm up the TCP connection.
|
| def _Warmup():
|
| - warmup_key.get_contents_to_file(self.devnull,
|
| - **self.get_contents_to_file_args)
|
| + self.gsutil_api.GetObjectMedia(warmup_url.bucket_name,
|
| + warmup_url.object_name,
|
| + self.discard_sink,
|
| + provider=self.provider)
|
| self._RunOperation(_Warmup)
|
|
|
| times = []
|
|
|
| def _Download():
|
| t0 = time.time()
|
| - k.get_contents_to_file(self.devnull, **self.get_contents_to_file_args)
|
| + self.gsutil_api.GetObjectMedia(
|
| + thru_url.bucket_name, thru_url.object_name, self.discard_sink,
|
| + provider=self.provider, serialization_data=serialization_data)
|
| t1 = time.time()
|
| times.append(t1 - t0)
|
| for _ in range(self.num_iterations):
|
| self._RunOperation(_Download)
|
| time_took = sum(times)
|
| else:
|
| - args = [k] * self.num_iterations
|
| + args = ([(thru_url.bucket_name, thru_url.object_name, serialization_data)]
|
| + * self.num_iterations)
|
| self.logger.addFilter(self._CpFilter())
|
|
|
| t0 = time.time()
|
| - self.Apply(_DownloadKey,
|
| + self.Apply(_DownloadWrapper,
|
| args,
|
| _PerfdiagExceptionHandler,
|
| arg_checker=DummyArgChecker,
|
| @@ -561,36 +619,62 @@
|
| 'processes': self.processes,
|
| 'threads': self.threads}
|
|
|
| - k = self.bucket.key_class(self.bucket)
|
| - k.BufferSize = self.KEY_BUFFER_SIZE
|
| - k.key = os.path.basename(self.thru_local_file)
|
| + warmup_url = self.bucket_url.Clone()
|
| + warmup_url.object_name = os.path.basename(self.tcp_warmup_file)
|
| + warmup_target = StorageUrlToUploadObjectMetadata(warmup_url)
|
| +
|
| + thru_url = self.bucket_url.Clone()
|
| + thru_url.object_name = os.path.basename(self.thru_local_file)
|
| + thru_target = StorageUrlToUploadObjectMetadata(thru_url)
|
| + thru_tuples = []
|
| + for i in xrange(self.num_iterations):
|
| + # Create a unique name for each uploaded object. Otherwise,
|
| + # the XML API would fail when trying to non-atomically get metadata
|
| + # for the object that gets blown away by the overwrite.
|
| + thru_tuples.append(UploadObjectTuple(
|
| + thru_target.bucket, thru_target.name + str(i),
|
| + filepath=self.thru_local_file))
|
| +
|
| if self.processes == 1 and self.threads == 1:
|
| # Warm up the TCP connection.
|
| - warmup_key = self.bucket.key_class(self.bucket)
|
| - warmup_key.key = os.path.basename(self.tcp_warmup_file)
|
| -
|
| def _Warmup():
|
| - warmup_key.set_contents_from_string(
|
| - self.file_contents[self.tcp_warmup_file],
|
| - md5=self.file_md5s[self.tcp_warmup_file])
|
| + self.gsutil_api.UploadObject(
|
| + cStringIO.StringIO(self.file_contents[self.tcp_warmup_file]),
|
| + warmup_target, provider=self.provider, size=self.thru_filesize,
|
| + fields=['name'])
|
| self._RunOperation(_Warmup)
|
|
|
| times = []
|
|
|
| - def _Upload():
|
| - t0 = time.time()
|
| - k.set_contents_from_string(self.file_contents[self.thru_local_file],
|
| - md5=self.file_md5s[self.thru_local_file])
|
| - t1 = time.time()
|
| - times.append(t1 - t0)
|
| - for _ in range(self.num_iterations):
|
| + for i in xrange(self.num_iterations):
|
| + thru_tuple = thru_tuples[i]
|
| + def _Upload():
|
| + """Uploads the write throughput measurement object."""
|
| + upload_target = apitools_messages.Object(
|
| + bucket=thru_tuple.bucket_name, name=thru_tuple.object_name,
|
| + md5Hash=thru_tuple.md5)
|
| + io_fp = cStringIO.StringIO(self.file_contents[self.thru_local_file])
|
| + t0 = time.time()
|
| + if self.thru_filesize < ResumableThreshold():
|
| + self.gsutil_api.UploadObject(
|
| + io_fp, upload_target, provider=self.provider,
|
| + size=self.thru_filesize, fields=['name'])
|
| + else:
|
| + self.gsutil_api.UploadObjectResumable(
|
| + io_fp, upload_target, provider=self.provider,
|
| + size=self.thru_filesize, fields=['name'],
|
| + tracker_callback=_DummyTrackerCallback)
|
| +
|
| + t1 = time.time()
|
| + times.append(t1 - t0)
|
| +
|
| self._RunOperation(_Upload)
|
| time_took = sum(times)
|
|
|
| else:
|
| - args = [k] * self.num_iterations
|
| + args = thru_tuples
|
| t0 = time.time()
|
| - self.Apply(_UploadKey,
|
| + self.Apply(_UploadWrapper,
|
| args,
|
| _PerfdiagExceptionHandler,
|
| arg_checker=DummyArgChecker,
|
| @@ -607,6 +691,136 @@
|
| self.results['write_throughput']['total_bytes_copied'] = total_bytes_copied
|
| self.results['write_throughput']['bytes_per_second'] = bytes_per_second
|
|
|
| + def _RunListTests(self):
|
| + """Runs eventual consistency listing latency tests."""
|
| + self.results['listing'] = {'num_files': self.num_iterations}
|
| +
|
| + # Generate N random object names to put in the bucket.
|
| + list_prefix = 'gsutil-perfdiag-list-'
|
| + list_objects = []
|
| + for _ in xrange(self.num_iterations):
|
| + list_objects.append(
|
| + u'%s%s' % (list_prefix, os.urandom(20).encode('hex')))
|
| +
|
| + # Add the objects to the bucket.
|
| + self.logger.info(
|
| + '\nWriting %s objects for listing test...', self.num_iterations)
|
| + empty_md5 = CalculateB64EncodedMd5FromContents(cStringIO.StringIO(''))
|
| + args = [
|
| + UploadObjectTuple(self.bucket_url.bucket_name, name, md5=empty_md5,
|
| + contents='') for name in list_objects]
|
| + self.Apply(_UploadWrapper, args, _PerfdiagExceptionHandler,
|
| + arg_checker=DummyArgChecker)
|
| +
|
| + list_latencies = []
|
| + files_seen = []
|
| + total_start_time = time.time()
|
| + expected_objects = set(list_objects)
|
| + found_objects = set()
|
| +
|
| + def _List():
|
| + """Lists and returns objects in the bucket. Also records latency."""
|
| + t0 = time.time()
|
| + objects = list(self.gsutil_api.ListObjects(
|
| + self.bucket_url.bucket_name, prefix=list_prefix, delimiter='/',
|
| + provider=self.provider, fields=['items/name']))
|
| + t1 = time.time()
|
| + list_latencies.append(t1 - t0)
|
| + return set([obj.data.name for obj in objects])
|
| +
|
| + self.logger.info(
|
| + 'Listing bucket %s waiting for %s objects to appear...',
|
| + self.bucket_url.bucket_name, self.num_iterations)
|
| + while expected_objects - found_objects:
|
| + def _ListAfterUpload():
|
| + names = _List()
|
| + found_objects.update(names & expected_objects)
|
| + files_seen.append(len(found_objects))
|
| + self._RunOperation(_ListAfterUpload)
|
| + if expected_objects - found_objects:
|
| + if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME:
|
| + self.logger.warning('Maximum time reached waiting for listing.')
|
| + break
|
| + total_end_time = time.time()
|
| +
|
| + self.results['listing']['insert'] = {
|
| + 'num_listing_calls': len(list_latencies),
|
| + 'list_latencies': list_latencies,
|
| + 'files_seen_after_listing': files_seen,
|
| + 'time_took': total_end_time - total_start_time,
|
| + }
|
| +
|
| + self.logger.info(
|
| + 'Deleting %s objects for listing test...', self.num_iterations)
|
| + self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
|
| + arg_checker=DummyArgChecker)
|
| +
|
| + self.logger.info(
|
| + 'Listing bucket %s waiting for %s objects to disappear...',
|
| + self.bucket_url.bucket_name, self.num_iterations)
|
| + list_latencies = []
|
| + files_seen = []
|
| + total_start_time = time.time()
|
| + found_objects = set(list_objects)
|
| + while found_objects:
|
| + def _ListAfterDelete():
|
| + names = _List()
|
| + found_objects.intersection_update(names)
|
| + files_seen.append(len(found_objects))
|
| + self._RunOperation(_ListAfterDelete)
|
| + if found_objects:
|
| + if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME:
|
| + self.logger.warning('Maximum time reached waiting for listing.')
|
| + break
|
| + total_end_time = time.time()
|
| +
|
| + self.results['listing']['delete'] = {
|
| + 'num_listing_calls': len(list_latencies),
|
| + 'list_latencies': list_latencies,
|
| + 'files_seen_after_listing': files_seen,
|
| + 'time_took': total_end_time - total_start_time,
|
| + }
|
| +
|
| + def Upload(self, thru_tuple, thread_state=None):
|
| + gsutil_api = GetCloudApiInstance(self, thread_state)
|
| +
|
| + md5hash = thru_tuple.md5
|
| + contents = thru_tuple.contents
|
| + if thru_tuple.filepath:
|
| + md5hash = self.file_md5s[thru_tuple.filepath]
|
| + contents = self.file_contents[thru_tuple.filepath]
|
| +
|
| + upload_target = apitools_messages.Object(
|
| + bucket=thru_tuple.bucket_name, name=thru_tuple.object_name,
|
| + md5Hash=md5hash)
|
| + file_size = len(contents)
|
| + if file_size < ResumableThreshold():
|
| + gsutil_api.UploadObject(
|
| + cStringIO.StringIO(contents), upload_target,
|
| + provider=self.provider, size=file_size, fields=['name'])
|
| + else:
|
| + gsutil_api.UploadObjectResumable(
|
| + cStringIO.StringIO(contents), upload_target,
|
| + provider=self.provider, size=file_size, fields=['name'],
|
| + tracker_callback=_DummyTrackerCallback)
|
| +
|
| + def Download(self, download_tuple, thread_state=None):
|
| + """Downloads a file.
|
| +
|
| + Args:
|
| + download_tuple: (bucket name, object name, serialization data for object).
|
| + thread_state: gsutil Cloud API instance to use for the download.
|
| + """
|
| + gsutil_api = GetCloudApiInstance(self, thread_state)
|
| + gsutil_api.GetObjectMedia(
|
| + download_tuple[0], download_tuple[1], self.discard_sink,
|
| + provider=self.provider, serialization_data=download_tuple[2])
|
| +
|
| + def Delete(self, thru_tuple, thread_state=None):
|
| + gsutil_api = thread_state or self.gsutil_api
|
| + gsutil_api.DeleteObject(
|
| + thru_tuple.bucket_name, thru_tuple.object_name, provider=self.provider)
|
| +
|
| def _GetDiskCounters(self):
|
| """Retrieves disk I/O statistics for all disks.
|
|
|
| @@ -702,12 +916,22 @@
|
| """Collects system information."""
|
| sysinfo = {}
|
|
|
| - # All exceptions that might be thrown from socket module calls.
|
| + # All exceptions that might be raised from socket module calls.
|
| socket_errors = (
|
| socket.error, socket.herror, socket.gaierror, socket.timeout)
|
|
|
| # Find out whether HTTPS is enabled in Boto.
|
| sysinfo['boto_https_enabled'] = boto.config.get('Boto', 'is_secure', True)
|
| +
|
| + # Look up proxy info.
|
| + proxy_host = boto.config.get('Boto', 'proxy', None)
|
| + proxy_port = boto.config.getint('Boto', 'proxy_port', 0)
|
| + sysinfo['using_proxy'] = bool(proxy_host)
|
| +
|
| + if boto.config.get('Boto', 'proxy_rdns', False):
|
| + self.logger.info('DNS lookups are disallowed in this environment, so '
|
| + 'some information is not included in this perfdiag run.')
|
| +
|
| # Get the local IP address from socket lib.
|
| try:
|
| sysinfo['ip_address'] = socket.gethostbyname(socket.gethostname())
|
| @@ -723,27 +947,38 @@
|
|
|
| # Execute a CNAME lookup on Google DNS to find what Google server
|
| # it's routing to.
|
| - cmd = ['nslookup', '-type=CNAME', self.GOOGLE_API_HOST]
|
| + cmd = ['nslookup', '-type=CNAME', self.XML_API_HOST]
|
| try:
|
| nslookup_cname_output = self._Exec(cmd, return_output=True)
|
| m = re.search(r' = (?P<googserv>[^.]+)\.', nslookup_cname_output)
|
| sysinfo['googserv_route'] = m.group('googserv') if m else None
|
| - except OSError:
|
| + except (CommandException, OSError):
|
| sysinfo['googserv_route'] = ''
|
|
|
| + # Try to determine the latency of a DNS lookup for the Google hostname
|
| + # endpoint. Note: we don't piggyback on gethostbyname_ex below because
|
| + # the _ex version requires an extra RTT.
|
| + try:
|
| + t0 = time.time()
|
| + socket.gethostbyname(self.XML_API_HOST)
|
| + t1 = time.time()
|
| + sysinfo['google_host_dns_latency'] = t1 - t0
|
| + except socket_errors:
|
| + pass
|
| +
|
| # Look up IP addresses for Google Server.
|
| try:
|
| - (hostname, aliaslist, ipaddrlist) = socket.gethostbyname_ex(
|
| - self.GOOGLE_API_HOST)
|
| + (hostname, _, ipaddrlist) = socket.gethostbyname_ex(self.XML_API_HOST)
|
| sysinfo['googserv_ips'] = ipaddrlist
|
| except socket_errors:
|
| + ipaddrlist = []
|
| sysinfo['googserv_ips'] = []
|
|
|
| # Reverse lookup the hostnames for the Google Server IPs.
|
| sysinfo['googserv_hostnames'] = []
|
| for googserv_ip in ipaddrlist:
|
| try:
|
| - (hostname, aliaslist, ipaddrlist) = socket.gethostbyaddr(googserv_ip)
|
| + (hostname, _, ipaddrlist) = socket.gethostbyaddr(googserv_ip)
|
| sysinfo['googserv_hostnames'].append(hostname)
|
| except socket_errors:
|
| pass
|
| @@ -754,9 +989,43 @@
|
| nslookup_txt_output = self._Exec(cmd, return_output=True)
|
| m = re.search(r'text\s+=\s+"(?P<dnsip>[\.\d]+)"', nslookup_txt_output)
|
| sysinfo['dns_o-o_ip'] = m.group('dnsip') if m else None
|
| - except OSError:
|
| + except (CommandException, OSError):
|
| sysinfo['dns_o-o_ip'] = ''
|
|
|
| + # Try to determine the latency of connecting to the Google hostname
|
| + # endpoint.
|
| + sysinfo['google_host_connect_latencies'] = {}
|
| + for googserv_ip in ipaddrlist:
|
| + try:
|
| + sock = socket.socket()
|
| + t0 = time.time()
|
| + sock.connect((googserv_ip, self.XML_API_PORT))
|
| + t1 = time.time()
|
| + sysinfo['google_host_connect_latencies'][googserv_ip] = t1 - t0
|
| + except socket_errors:
|
| + pass
|
| +
|
| + # If using a proxy, try to determine the latency of a DNS lookup to resolve
|
| + # the proxy hostname and the latency of connecting to the proxy.
|
| + if proxy_host:
|
| + proxy_ip = None
|
| + try:
|
| + t0 = time.time()
|
| + proxy_ip = socket.gethostbyname(proxy_host)
|
| + t1 = time.time()
|
| + sysinfo['proxy_dns_latency'] = t1 - t0
|
| + except socket_errors:
|
| + pass
|
| +
|
| + try:
|
| + sock = socket.socket()
|
| + t0 = time.time()
|
| + sock.connect((proxy_ip or proxy_host, proxy_port))
|
| + t1 = time.time()
|
| + sysinfo['proxy_host_connect_latency'] = t1 - t0
|
| + except socket_errors:
|
| + pass
|
| +
|
| # Try and find the number of CPUs in the system if available.
|
| try:
|
| sysinfo['cpu_count'] = multiprocessing.cpu_count()
|
| @@ -902,6 +1171,33 @@
|
| print 'Read throughput: %s/s.' % (
|
| MakeBitsHumanReadable(read_thru['bytes_per_second'] * 8))
|
|
|
| + if 'listing' in self.results:
|
| + print
|
| + print '-' * 78
|
| + print 'Listing'.center(78)
|
| + print '-' * 78
|
| +
|
| + listing = self.results['listing']
|
| + insert = listing['insert']
|
| + delete = listing['delete']
|
| + print 'After inserting %s objects:' % listing['num_files']
|
| + print (' Total time for objects to appear: %.2g seconds' %
|
| + insert['time_took'])
|
| + print ' Number of listing calls made: %s' % insert['num_listing_calls']
|
| + print (' Individual listing call latencies: [%s]' %
|
| + ', '.join('%.2gs' % lat for lat in insert['list_latencies']))
|
| + print (' Files reflected after each call: [%s]' %
|
| + ', '.join(map(str, insert['files_seen_after_listing'])))
|
| +
|
| + print 'After deleting %s objects:' % listing['num_files']
|
| + print (' Total time for objects to appear: %.2g seconds' %
|
| + delete['time_took'])
|
| + print ' Number of listing calls made: %s' % delete['num_listing_calls']
|
| + print (' Individual listing call latencies: [%s]' %
|
| + ', '.join('%.2gs' % lat for lat in delete['list_latencies']))
|
| + print (' Files reflected after each call: [%s]' %
|
| + ', '.join(map(str, delete['files_seen_after_listing'])))
|
| +
|
| if 'sysinfo' in self.results:
|
| print
|
| print '-' * 78
|
| @@ -989,6 +1285,26 @@
|
| if 'boto_https_enabled' in info:
|
| print 'Boto HTTPS Enabled: \n %s' % info['boto_https_enabled']
|
|
|
| + if 'using_proxy' in info:
|
| + print 'Requests routed through proxy: \n %s' % info['using_proxy']
|
| +
|
| + if 'google_host_dns_latency' in info:
|
| + print ('Latency of the DNS lookup for Google Storage server (ms): '
|
| + '\n %.1f' % (info['google_host_dns_latency'] * 1000.0))
|
| +
|
| + if 'google_host_connect_latencies' in info:
|
| + print 'Latencies connecting to Google Storage server IPs (ms):'
|
| + for ip, latency in info['google_host_connect_latencies'].iteritems():
|
| + print ' %s = %.1f' % (ip, latency * 1000.0)
|
| +
|
| + if 'proxy_dns_latency' in info:
|
| + print ('Latency of the DNS lookup for the configured proxy (ms): '
|
| + '\n %.1f' % (info['proxy_dns_latency'] * 1000.0))
|
| +
|
| + if 'proxy_host_connect_latency' in info:
|
| + print ('Latency connecting to the configured proxy (ms): \n %.1f' %
|
| + (info['proxy_host_connect_latency'] * 1000.0))
|
| +
|
| if 'request_errors' in self.results and 'total_requests' in self.results:
|
| print
|
| print '-' * 78
|
| @@ -1050,7 +1366,7 @@
|
| # From -s.
|
| self.thru_filesize = 1048576
|
| # From -t.
|
| - self.diag_tests = self.ALL_DIAG_TESTS
|
| + self.diag_tests = self.DEFAULT_DIAG_TESTS
|
| # From -o.
|
| self.output_file = None
|
| # From -i.
|
| @@ -1109,20 +1425,20 @@
|
| if not self.args:
|
| raise CommandException('Wrong number of arguments for "perfdiag" '
|
| 'command.')
|
| - self.bucket_uri = self.suri_builder.StorageUri(self.args[0])
|
| - if not self.bucket_uri.names_bucket():
|
| - raise CommandException('The perfdiag command requires a URI that '
|
| +
|
| + self.bucket_url = StorageUrlFromString(self.args[0])
|
| + self.provider = self.bucket_url.scheme
|
| + if not (self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket()):
|
| + raise CommandException('The perfdiag command requires a URL that '
|
| 'specifies a bucket.\n"%s" is not '
|
| - 'valid.' % self.bucket_uri)
|
| - self.bucket = self.bucket_uri.get_bucket()
|
| + 'valid.' % self.args[0])
|
| + # Ensure the bucket exists.
|
| + self.gsutil_api.GetBucket(self.bucket_url.bucket_name,
|
| + provider=self.bucket_url.scheme,
|
| + fields=['id'])
|
| + self.exceptions = [httplib.HTTPException, socket.error, socket.gaierror,
|
| + httplib.BadStatusLine, ServiceException]
|
|
|
| - # TODO: Add MD5 argument support to get_contents_to_file()
|
| - # and pass the file md5 as a parameter to avoid any unnecessary
|
| - # computation.
|
| - self.get_contents_to_file_args = {}
|
| - if self.bucket_uri.scheme == 'gs':
|
| - self.get_contents_to_file_args = {'hash_algs': {}}
|
| -
|
| # Command entry point.
|
| def RunCommand(self):
|
| """Called by gsutil when the command is being invoked."""
|
| @@ -1144,7 +1460,7 @@
|
| 'Throughput file size: %s\n'
|
| 'Diagnostics to run: %s',
|
| self.num_iterations,
|
| - self.bucket_uri,
|
| + self.bucket_url,
|
| self.processes,
|
| self.threads,
|
| MakeHumanReadable(self.thru_filesize),
|
| @@ -1159,17 +1475,19 @@
|
| self.results['sysinfo']['netstat_start'] = self._GetTcpStats()
|
| if IS_LINUX:
|
| self.results['sysinfo']['disk_counters_start'] = self._GetDiskCounters()
|
| - # Record bucket URI.
|
| - self.results['bucket_uri'] = str(self.bucket_uri)
|
| + # Record bucket URL.
|
| + self.results['bucket_uri'] = str(self.bucket_url)
|
| self.results['json_format'] = 'perfdiag'
|
| self.results['metadata'] = self.metadata_keys
|
|
|
| - if 'lat' in self.diag_tests:
|
| + if self.LAT in self.diag_tests:
|
| self._RunLatencyTests()
|
| - if 'rthru' in self.diag_tests:
|
| + if self.RTHRU in self.diag_tests:
|
| self._RunReadThruTests()
|
| - if 'wthru' in self.diag_tests:
|
| + if self.WTHRU in self.diag_tests:
|
| self._RunWriteThruTests()
|
| + if self.LIST in self.diag_tests:
|
| + self._RunListTests()
|
|
|
| # Collect netstat info and disk counters after tests.
|
| self.results['sysinfo']['netstat_end'] = self._GetTcpStats()
|
| @@ -1188,3 +1506,48 @@
|
| self._TearDown()
|
|
|
| return 0
|
| +
|
| +
|
| +class UploadObjectTuple(object):
|
| + """Picklable tuple with necessary metadata for an insert object call."""
|
| +
|
| + def __init__(self, bucket_name, object_name, filepath=None, md5=None,
|
| + contents=None):
|
| + """Create an upload tuple.
|
| +
|
| + Args:
|
| + bucket_name: Name of the bucket to upload to.
|
| + object_name: Name of the object to upload to.
|
| + filepath: A file path located in self.file_contents and self.file_md5s.
|
| + md5: The MD5 hash of the object being uploaded.
|
| + contents: The contents of the file to be uploaded.
|
| +
|
| + Note: (contents + md5) and filepath are mutually exlusive. You may specify
|
| + one or the other, but not both.
|
| + Note: If one of contents or md5 are specified, they must both be specified.
|
| +
|
| + Raises:
|
| + InvalidArgument: if the arguments are invalid.
|
| + """
|
| + self.bucket_name = bucket_name
|
| + self.object_name = object_name
|
| + self.filepath = filepath
|
| + self.md5 = md5
|
| + self.contents = contents
|
| + if filepath and (md5 or contents is not None):
|
| + raise InvalidArgument(
|
| + 'Only one of filepath or (md5 + contents) may be specified.')
|
| + if not filepath and (not md5 or contents is None):
|
| + raise InvalidArgument(
|
| + 'Both md5 and contents must be specified.')
|
| +
|
| +
|
| +def StorageUrlToUploadObjectMetadata(storage_url):
|
| + if storage_url.IsCloudUrl() and storage_url.IsObject():
|
| + upload_target = apitools_messages.Object()
|
| + upload_target.name = storage_url.object_name
|
| + upload_target.bucket = storage_url.bucket_name
|
| + return upload_target
|
| + else:
|
| + raise CommandException('Non-cloud URL upload target %s was created in '
|
| + 'perfdiag implemenation.' % storage_url)
|
|
|