OLD | NEW |
| (Empty) |
1 # -*- coding: utf-8 -*- | |
2 # Copyright 2012 Google Inc. All Rights Reserved. | |
3 # | |
4 # Licensed under the Apache License, Version 2.0 (the "License"); | |
5 # you may not use this file except in compliance with the License. | |
6 # You may obtain a copy of the License at | |
7 # | |
8 # http://www.apache.org/licenses/LICENSE-2.0 | |
9 # | |
10 # Unless required by applicable law or agreed to in writing, software | |
11 # distributed under the License is distributed on an "AS IS" BASIS, | |
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 # See the License for the specific language governing permissions and | |
14 # limitations under the License. | |
15 """Contains the perfdiag gsutil command.""" | |
16 | |
17 from __future__ import absolute_import | |
18 | |
19 import calendar | |
20 from collections import defaultdict | |
21 import contextlib | |
22 import cStringIO | |
23 import datetime | |
24 import httplib | |
25 import json | |
26 import logging | |
27 import math | |
28 import multiprocessing | |
29 import os | |
30 import random | |
31 import re | |
32 import socket | |
33 import string | |
34 import subprocess | |
35 import tempfile | |
36 import time | |
37 | |
38 import boto | |
39 import boto.gs.connection | |
40 | |
41 import gslib | |
42 from gslib.cloud_api import NotFoundException | |
43 from gslib.cloud_api import ServiceException | |
44 from gslib.cloud_api_helper import GetDownloadSerializationDict | |
45 from gslib.command import Command | |
46 from gslib.command import DummyArgChecker | |
47 from gslib.command_argument import CommandArgument | |
48 from gslib.commands import config | |
49 from gslib.cs_api_map import ApiSelector | |
50 from gslib.exception import CommandException | |
51 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents | |
52 from gslib.storage_url import StorageUrlFromString | |
53 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages | |
54 from gslib.util import GetCloudApiInstance | |
55 from gslib.util import GetMaxRetryDelay | |
56 from gslib.util import HumanReadableToBytes | |
57 from gslib.util import IS_LINUX | |
58 from gslib.util import MakeBitsHumanReadable | |
59 from gslib.util import MakeHumanReadable | |
60 from gslib.util import Percentile | |
61 from gslib.util import ResumableThreshold | |
62 | |
63 | |
64 _SYNOPSIS = """ | |
65 gsutil perfdiag [-i in.json] | |
66 gsutil perfdiag [-o out.json] [-n iterations] [-c processes] | |
67 [-k threads] [-s size] [-t tests] url... | |
68 """ | |
69 | |
70 _DETAILED_HELP_TEXT = (""" | |
71 <B>SYNOPSIS</B> | |
72 """ + _SYNOPSIS + """ | |
73 | |
74 | |
75 <B>DESCRIPTION</B> | |
76 The perfdiag command runs a suite of diagnostic tests for a given Google | |
77 Storage bucket. | |
78 | |
79 The 'url' parameter must name an existing bucket (e.g. gs://foo) to which | |
80 the user has write permission. Several test files will be uploaded to and | |
81 downloaded from this bucket. All test files will be deleted at the completion | |
82 of the diagnostic if it finishes successfully. | |
83 | |
84 gsutil performance can be impacted by many factors at the client, server, | |
85 and in-between, such as: CPU speed; available memory; the access path to the | |
86 local disk; network bandwidth; contention and error rates along the path | |
87 between gsutil and Google; operating system buffering configuration; and | |
88 firewalls and other network elements. The perfdiag command is provided so | |
89 that customers can run a known measurement suite when troubleshooting | |
90 performance problems. | |
91 | |
92 | |
93 <B>PROVIDING DIAGNOSTIC OUTPUT TO GOOGLE CLOUD STORAGE TEAM</B> | |
94 If the Google Cloud Storage Team asks you to run a performance diagnostic | |
95 please use the following command, and email the output file (output.json) | |
96 to gs-team@google.com: | |
97 | |
98 gsutil perfdiag -o output.json gs://your-bucket | |
99 | |
100 | |
101 <B>OPTIONS</B> | |
102 -n Sets the number of iterations performed when downloading and | |
103 uploading files during latency and throughput tests. Defaults to | |
104 5. | |
105 | |
106 -c Sets the number of processes to use while running throughput | |
107 experiments. The default value is 1. | |
108 | |
109 -k Sets the number of threads per process to use while running | |
110 throughput experiments. Each process will receive an equal number | |
111 of threads. The default value is 1. | |
112 | |
113 -s Sets the size (in bytes) of the test file used to perform read | |
114 and write throughput tests. The default is 1 MiB. This can also | |
115 be specified using byte suffixes such as 500K or 1M. Note: these | |
116 values are interpreted as multiples of 1024 (K=1024, M=1024*1024, | |
117 etc.) | |
118 | |
119 -t Sets the list of diagnostic tests to perform. The default is to | |
120 run all diagnostic tests. Must be a comma-separated list | |
121 containing one or more of the following: | |
122 | |
123 lat | |
124 Runs N iterations (set with -n) of writing the file, | |
125 retrieving its metadata, reading the file, and deleting | |
126 the file. Records the latency of each operation. | |
127 | |
128 list | |
129 Write N (set with -n) objects to the bucket, record how long | |
130 it takes for the eventually consistent listing call to return | |
131 the N objects in its result, delete the N objects, then record | |
132 how long it takes listing to stop returning the N objects. | |
133 This test is off by default. | |
134 | |
135 rthru | |
136 Runs N (set with -n) read operations, with at most C | |
137 (set with -c) reads outstanding at any given time. | |
138 | |
139 wthru | |
140 Runs N (set with -n) write operations, with at most C | |
141 (set with -c) writes outstanding at any given time. | |
142 | |
143 -m Adds metadata to the result JSON file. Multiple -m values can be | |
144 specified. Example: | |
145 | |
146 gsutil perfdiag -m "key1:value1" -m "key2:value2" \ | |
147 gs://bucketname/ | |
148 | |
149 Each metadata key will be added to the top-level "metadata" | |
150 dictionary in the output JSON file. | |
151 | |
152 -o Writes the results of the diagnostic to an output file. The output | |
153 is a JSON file containing system information and performance | |
154 diagnostic results. The file can be read and reported later using | |
155 the -i option. | |
156 | |
157 -i Reads the JSON output file created using the -o command and prints | |
158 a formatted description of the results. | |
159 | |
160 | |
161 <B>MEASURING AVAILABILITY</B> | |
162 The perfdiag command ignores the boto num_retries configuration parameter. | |
163 Instead, it always retries on HTTP errors in the 500 range and keeps track of | |
164 how many 500 errors were encountered during the test. The availability | |
165 measurement is reported at the end of the test. | |
166 | |
167 Note that HTTP responses are only recorded when the request was made in a | |
168 single process. When using multiple processes or threads, read and write | |
169 throughput measurements are performed in an external process, so the | |
170 availability numbers reported won't include the throughput measurements. | |
171 | |
172 | |
173 <B>NOTE</B> | |
174 The perfdiag command collects system information. It collects your IP address, | |
175 executes DNS queries to Google servers and collects the results, and collects | |
176 network statistics information from the output of netstat -s. It will also | |
177 attempt to connect to your proxy server if you have one configured. None of | |
178 this information will be sent to Google unless you choose to send it. | |
179 """) | |
180 | |
181 | |
182 class Error(Exception): | |
183 """Base exception class for this module.""" | |
184 pass | |
185 | |
186 | |
187 class InvalidArgument(Error): | |
188 """Raised on invalid arguments to functions.""" | |
189 pass | |
190 | |
191 | |
192 def _DownloadWrapper(cls, arg, thread_state=None): | |
193 cls.Download(arg, thread_state=thread_state) | |
194 | |
195 | |
196 def _UploadWrapper(cls, arg, thread_state=None): | |
197 cls.Upload(arg, thread_state=thread_state) | |
198 | |
199 | |
200 def _DeleteWrapper(cls, arg, thread_state=None): | |
201 cls.Delete(arg, thread_state=thread_state) | |
202 | |
203 | |
204 def _PerfdiagExceptionHandler(cls, e): | |
205 """Simple exception handler to allow post-completion status.""" | |
206 cls.logger.error(str(e)) | |
207 | |
208 | |
209 def _DummyTrackerCallback(_): | |
210 pass | |
211 | |
212 | |
213 class DummyFile(object): | |
214 """A dummy, file-like object that throws away everything written to it.""" | |
215 | |
216 def write(self, *args, **kwargs): # pylint: disable=invalid-name | |
217 pass | |
218 | |
219 | |
220 # Many functions in perfdiag re-define a temporary function based on a | |
221 # variable from a loop, resulting in a false positive from the linter. | |
222 # pylint: disable=cell-var-from-loop | |
223 class PerfDiagCommand(Command): | |
224 """Implementation of gsutil perfdiag command.""" | |
225 | |
226 # Command specification. See base class for documentation. | |
227 command_spec = Command.CreateCommandSpec( | |
228 'perfdiag', | |
229 command_name_aliases=['diag', 'diagnostic', 'perf', 'performance'], | |
230 usage_synopsis=_SYNOPSIS, | |
231 min_args=0, | |
232 max_args=1, | |
233 supported_sub_args='n:c:k:s:t:m:i:o:', | |
234 file_url_ok=False, | |
235 provider_url_ok=False, | |
236 urls_start_arg=0, | |
237 gs_api_support=[ApiSelector.XML, ApiSelector.JSON], | |
238 gs_default_api=ApiSelector.JSON, | |
239 argparse_arguments=[ | |
240 CommandArgument.MakeNCloudBucketURLsArgument(1) | |
241 ] | |
242 ) | |
243 # Help specification. See help_provider.py for documentation. | |
244 help_spec = Command.HelpSpec( | |
245 help_name='perfdiag', | |
246 help_name_aliases=[], | |
247 help_type='command_help', | |
248 help_one_line_summary='Run performance diagnostic', | |
249 help_text=_DETAILED_HELP_TEXT, | |
250 subcommand_help_text={}, | |
251 ) | |
252 | |
253 # Byte sizes to use for latency testing files. | |
254 # TODO: Consider letting the user specify these sizes with a configuration | |
255 # parameter. | |
256 test_file_sizes = ( | |
257 0, # 0 bytes | |
258 1024, # 1 KiB | |
259 102400, # 100 KiB | |
260 1048576, # 1 MiB | |
261 ) | |
262 | |
263 # Test names. | |
264 RTHRU = 'rthru' | |
265 WTHRU = 'wthru' | |
266 LAT = 'lat' | |
267 LIST = 'list' | |
268 | |
269 # List of all diagnostic tests. | |
270 ALL_DIAG_TESTS = (RTHRU, WTHRU, LAT, LIST) | |
271 # List of diagnostic tests to run by default. | |
272 DEFAULT_DIAG_TESTS = (RTHRU, WTHRU, LAT) | |
273 | |
274 # Google Cloud Storage XML API endpoint host. | |
275 XML_API_HOST = boto.config.get( | |
276 'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost) | |
277 # Google Cloud Storage XML API endpoint port. | |
278 XML_API_PORT = boto.config.get('Credentials', 'gs_port', 80) | |
279 | |
280 # Maximum number of times to retry requests on 5xx errors. | |
281 MAX_SERVER_ERROR_RETRIES = 5 | |
282 # Maximum number of times to retry requests on more serious errors like | |
283 # the socket breaking. | |
284 MAX_TOTAL_RETRIES = 10 | |
285 | |
286 # The default buffer size in boto's Key object is set to 8 KiB. This becomes a | |
287 # bottleneck at high throughput rates, so we increase it. | |
288 KEY_BUFFER_SIZE = 16384 | |
289 | |
290 # The maximum number of bytes to generate pseudo-randomly before beginning | |
291 # to repeat bytes. This number was chosen as the next prime larger than 5 MiB. | |
292 MAX_UNIQUE_RANDOM_BYTES = 5242883 | |
293 | |
294 # Maximum amount of time, in seconds, we will wait for object listings to | |
295 # reflect what we expect in the listing tests. | |
296 MAX_LISTING_WAIT_TIME = 60.0 | |
297 | |
298 def _Exec(self, cmd, raise_on_error=True, return_output=False, | |
299 mute_stderr=False): | |
300 """Executes a command in a subprocess. | |
301 | |
302 Args: | |
303 cmd: List containing the command to execute. | |
304 raise_on_error: Whether or not to raise an exception when a process exits | |
305 with a non-zero return code. | |
306 return_output: If set to True, the return value of the function is the | |
307 stdout of the process. | |
308 mute_stderr: If set to True, the stderr of the process is not printed to | |
309 the console. | |
310 | |
311 Returns: | |
312 The return code of the process or the stdout if return_output is set. | |
313 | |
314 Raises: | |
315 Exception: If raise_on_error is set to True and any process exits with a | |
316 non-zero return code. | |
317 """ | |
318 self.logger.debug('Running command: %s', cmd) | |
319 stderr = subprocess.PIPE if mute_stderr else None | |
320 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=stderr) | |
321 (stdoutdata, _) = p.communicate() | |
322 if raise_on_error and p.returncode: | |
323 raise CommandException("Received non-zero return code (%d) from " | |
324 "subprocess '%s'." % (p.returncode, ' '.join(cmd))) | |
325 return stdoutdata if return_output else p.returncode | |
326 | |
327 def _SetUp(self): | |
328 """Performs setup operations needed before diagnostics can be run.""" | |
329 | |
330 # Stores test result data. | |
331 self.results = {} | |
332 # List of test files in a temporary location on disk for latency ops. | |
333 self.latency_files = [] | |
334 # List of test objects to clean up in the test bucket. | |
335 self.test_object_names = set() | |
336 # Maps each test file path to its size in bytes. | |
337 self.file_sizes = {} | |
338 # Maps each test file to its contents as a string. | |
339 self.file_contents = {} | |
340 # Maps each test file to its MD5 hash. | |
341 self.file_md5s = {} | |
342 # Total number of HTTP requests made. | |
343 self.total_requests = 0 | |
344 # Total number of HTTP 5xx errors. | |
345 self.request_errors = 0 | |
346 # Number of responses, keyed by response code. | |
347 self.error_responses_by_code = defaultdict(int) | |
348 # Total number of socket errors. | |
349 self.connection_breaks = 0 | |
350 | |
351 def _MakeFile(file_size): | |
352 """Creates a temporary file of the given size and returns its path.""" | |
353 fd, fpath = tempfile.mkstemp(suffix='.bin', prefix='gsutil_test_file', | |
354 text=False) | |
355 self.file_sizes[fpath] = file_size | |
356 random_bytes = os.urandom(min(file_size, self.MAX_UNIQUE_RANDOM_BYTES)) | |
357 total_bytes = 0 | |
358 file_contents = '' | |
359 while total_bytes < file_size: | |
360 num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES, file_size - total_bytes) | |
361 file_contents += random_bytes[:num_bytes] | |
362 total_bytes += num_bytes | |
363 self.file_contents[fpath] = file_contents | |
364 with os.fdopen(fd, 'wb') as f: | |
365 f.write(self.file_contents[fpath]) | |
366 with open(fpath, 'rb') as f: | |
367 self.file_md5s[fpath] = CalculateB64EncodedMd5FromContents(f) | |
368 return fpath | |
369 | |
370 # Create files for latency tests. | |
371 for file_size in self.test_file_sizes: | |
372 fpath = _MakeFile(file_size) | |
373 self.latency_files.append(fpath) | |
374 | |
375 # Creating a file for warming up the TCP connection. | |
376 self.tcp_warmup_file = _MakeFile(5 * 1024 * 1024) # 5 Mebibytes. | |
377 # Remote file to use for TCP warmup. | |
378 self.tcp_warmup_remote_file = (str(self.bucket_url) + | |
379 os.path.basename(self.tcp_warmup_file)) | |
380 | |
381 # Local file on disk for write throughput tests. | |
382 self.thru_local_file = _MakeFile(self.thru_filesize) | |
383 | |
384 # Dummy file buffer to use for downloading that goes nowhere. | |
385 self.discard_sink = DummyFile() | |
386 | |
387 def _TearDown(self): | |
388 """Performs operations to clean things up after performing diagnostics.""" | |
389 for fpath in self.latency_files + [self.thru_local_file, | |
390 self.tcp_warmup_file]: | |
391 try: | |
392 os.remove(fpath) | |
393 except OSError: | |
394 pass | |
395 | |
396 for object_name in self.test_object_names: | |
397 | |
398 def _Delete(): | |
399 try: | |
400 self.gsutil_api.DeleteObject(self.bucket_url.bucket_name, | |
401 object_name, | |
402 provider=self.provider) | |
403 except NotFoundException: | |
404 pass | |
405 | |
406 self._RunOperation(_Delete) | |
407 | |
408 @contextlib.contextmanager | |
409 def _Time(self, key, bucket): | |
410 """A context manager that measures time. | |
411 | |
412 A context manager that prints a status message before and after executing | |
413 the inner command and times how long the inner command takes. Keeps track of | |
414 the timing, aggregated by the given key. | |
415 | |
416 Args: | |
417 key: The key to insert the timing value into a dictionary bucket. | |
418 bucket: A dictionary to place the timing value in. | |
419 | |
420 Yields: | |
421 For the context manager. | |
422 """ | |
423 self.logger.info('%s starting...', key) | |
424 t0 = time.time() | |
425 yield | |
426 t1 = time.time() | |
427 bucket[key].append(t1 - t0) | |
428 self.logger.info('%s done.', key) | |
429 | |
430 def _RunOperation(self, func): | |
431 """Runs an operation with retry logic. | |
432 | |
433 Args: | |
434 func: The function to run. | |
435 | |
436 Returns: | |
437 True if the operation succeeds, False if aborted. | |
438 """ | |
439 # We retry on httplib exceptions that can happen if the socket was closed | |
440 # by the remote party or the connection broke because of network issues. | |
441 # Only the BotoServerError is counted as a 5xx error towards the retry | |
442 # limit. | |
443 success = False | |
444 server_error_retried = 0 | |
445 total_retried = 0 | |
446 i = 0 | |
447 return_val = None | |
448 while not success: | |
449 next_sleep = min(random.random() * (2 ** i) + 1, GetMaxRetryDelay()) | |
450 try: | |
451 return_val = func() | |
452 self.total_requests += 1 | |
453 success = True | |
454 except tuple(self.exceptions) as e: | |
455 total_retried += 1 | |
456 if total_retried > self.MAX_TOTAL_RETRIES: | |
457 self.logger.info('Reached maximum total retries. Not retrying.') | |
458 break | |
459 if isinstance(e, ServiceException): | |
460 if e.status >= 500: | |
461 self.error_responses_by_code[e.status] += 1 | |
462 self.total_requests += 1 | |
463 self.request_errors += 1 | |
464 server_error_retried += 1 | |
465 time.sleep(next_sleep) | |
466 else: | |
467 raise | |
468 if server_error_retried > self.MAX_SERVER_ERROR_RETRIES: | |
469 self.logger.info( | |
470 'Reached maximum server error retries. Not retrying.') | |
471 break | |
472 else: | |
473 self.connection_breaks += 1 | |
474 return return_val | |
475 | |
476 def _RunLatencyTests(self): | |
477 """Runs latency tests.""" | |
478 # Stores timing information for each category of operation. | |
479 self.results['latency'] = defaultdict(list) | |
480 | |
481 for i in range(self.num_iterations): | |
482 self.logger.info('\nRunning latency iteration %d...', i+1) | |
483 for fpath in self.latency_files: | |
484 url = self.bucket_url.Clone() | |
485 url.object_name = os.path.basename(fpath) | |
486 file_size = self.file_sizes[fpath] | |
487 readable_file_size = MakeHumanReadable(file_size) | |
488 | |
489 self.logger.info( | |
490 "\nFile of size %s located on disk at '%s' being diagnosed in the " | |
491 "cloud at '%s'.", readable_file_size, fpath, url) | |
492 | |
493 upload_target = StorageUrlToUploadObjectMetadata(url) | |
494 | |
495 def _Upload(): | |
496 io_fp = cStringIO.StringIO(self.file_contents[fpath]) | |
497 with self._Time('UPLOAD_%d' % file_size, self.results['latency']): | |
498 self.gsutil_api.UploadObject( | |
499 io_fp, upload_target, size=file_size, provider=self.provider, | |
500 fields=['name']) | |
501 self._RunOperation(_Upload) | |
502 | |
503 def _Metadata(): | |
504 with self._Time('METADATA_%d' % file_size, self.results['latency']): | |
505 return self.gsutil_api.GetObjectMetadata( | |
506 url.bucket_name, url.object_name, | |
507 provider=self.provider, fields=['name', 'contentType', | |
508 'mediaLink', 'size']) | |
509 # Download will get the metadata first if we don't pass it in. | |
510 download_metadata = self._RunOperation(_Metadata) | |
511 serialization_dict = GetDownloadSerializationDict(download_metadata) | |
512 serialization_data = json.dumps(serialization_dict) | |
513 | |
514 def _Download(): | |
515 with self._Time('DOWNLOAD_%d' % file_size, self.results['latency']): | |
516 self.gsutil_api.GetObjectMedia( | |
517 url.bucket_name, url.object_name, self.discard_sink, | |
518 provider=self.provider, serialization_data=serialization_data) | |
519 self._RunOperation(_Download) | |
520 | |
521 def _Delete(): | |
522 with self._Time('DELETE_%d' % file_size, self.results['latency']): | |
523 self.gsutil_api.DeleteObject(url.bucket_name, url.object_name, | |
524 provider=self.provider) | |
525 self._RunOperation(_Delete) | |
526 | |
527 class _CpFilter(logging.Filter): | |
528 | |
529 def filter(self, record): | |
530 # Used to prevent cp._LogCopyOperation from spewing output from | |
531 # subprocesses about every iteration. | |
532 msg = record.getMessage() | |
533 return not (('Copying file:///' in msg) or ('Copying gs://' in msg) or | |
534 ('Computing CRC' in msg)) | |
535 | |
536 def _PerfdiagExceptionHandler(self, e): | |
537 """Simple exception handler to allow post-completion status.""" | |
538 self.logger.error(str(e)) | |
539 | |
540 def _RunReadThruTests(self): | |
541 """Runs read throughput tests.""" | |
542 self.logger.info( | |
543 '\nRunning read throughput tests (%s iterations of size %s)' % | |
544 (self.num_iterations, MakeHumanReadable(self.thru_filesize))) | |
545 | |
546 self.results['read_throughput'] = {'file_size': self.thru_filesize, | |
547 'num_times': self.num_iterations, | |
548 'processes': self.processes, | |
549 'threads': self.threads} | |
550 | |
551 # Copy the TCP warmup file. | |
552 warmup_url = self.bucket_url.Clone() | |
553 warmup_url.object_name = os.path.basename(self.tcp_warmup_file) | |
554 warmup_target = StorageUrlToUploadObjectMetadata(warmup_url) | |
555 self.test_object_names.add(warmup_url.object_name) | |
556 | |
557 def _Upload1(): | |
558 self.gsutil_api.UploadObject( | |
559 cStringIO.StringIO(self.file_contents[self.tcp_warmup_file]), | |
560 warmup_target, provider=self.provider, fields=['name']) | |
561 self._RunOperation(_Upload1) | |
562 | |
563 # Copy the file to remote location before reading. | |
564 thru_url = self.bucket_url.Clone() | |
565 thru_url.object_name = os.path.basename(self.thru_local_file) | |
566 thru_target = StorageUrlToUploadObjectMetadata(thru_url) | |
567 thru_target.md5Hash = self.file_md5s[self.thru_local_file] | |
568 self.test_object_names.add(thru_url.object_name) | |
569 | |
570 # Get the mediaLink here so that we can pass it to download. | |
571 def _Upload2(): | |
572 return self.gsutil_api.UploadObject( | |
573 cStringIO.StringIO(self.file_contents[self.thru_local_file]), | |
574 thru_target, provider=self.provider, size=self.thru_filesize, | |
575 fields=['name', 'mediaLink', 'size']) | |
576 | |
577 # Get the metadata for the object so that we are just measuring performance | |
578 # on the actual bytes transfer. | |
579 download_metadata = self._RunOperation(_Upload2) | |
580 serialization_dict = GetDownloadSerializationDict(download_metadata) | |
581 serialization_data = json.dumps(serialization_dict) | |
582 | |
583 if self.processes == 1 and self.threads == 1: | |
584 | |
585 # Warm up the TCP connection. | |
586 def _Warmup(): | |
587 self.gsutil_api.GetObjectMedia(warmup_url.bucket_name, | |
588 warmup_url.object_name, | |
589 self.discard_sink, | |
590 provider=self.provider) | |
591 self._RunOperation(_Warmup) | |
592 | |
593 times = [] | |
594 | |
595 def _Download(): | |
596 t0 = time.time() | |
597 self.gsutil_api.GetObjectMedia( | |
598 thru_url.bucket_name, thru_url.object_name, self.discard_sink, | |
599 provider=self.provider, serialization_data=serialization_data) | |
600 t1 = time.time() | |
601 times.append(t1 - t0) | |
602 for _ in range(self.num_iterations): | |
603 self._RunOperation(_Download) | |
604 time_took = sum(times) | |
605 else: | |
606 args = ([(thru_url.bucket_name, thru_url.object_name, serialization_data)] | |
607 * self.num_iterations) | |
608 self.logger.addFilter(self._CpFilter()) | |
609 | |
610 t0 = time.time() | |
611 self.Apply(_DownloadWrapper, | |
612 args, | |
613 _PerfdiagExceptionHandler, | |
614 arg_checker=DummyArgChecker, | |
615 parallel_operations_override=True, | |
616 process_count=self.processes, | |
617 thread_count=self.threads) | |
618 t1 = time.time() | |
619 time_took = t1 - t0 | |
620 | |
621 total_bytes_copied = self.thru_filesize * self.num_iterations | |
622 bytes_per_second = total_bytes_copied / time_took | |
623 | |
624 self.results['read_throughput']['time_took'] = time_took | |
625 self.results['read_throughput']['total_bytes_copied'] = total_bytes_copied | |
626 self.results['read_throughput']['bytes_per_second'] = bytes_per_second | |
627 | |
628 def _RunWriteThruTests(self): | |
629 """Runs write throughput tests.""" | |
630 self.logger.info( | |
631 '\nRunning write throughput tests (%s iterations of size %s)' % | |
632 (self.num_iterations, MakeHumanReadable(self.thru_filesize))) | |
633 | |
634 self.results['write_throughput'] = {'file_size': self.thru_filesize, | |
635 'num_copies': self.num_iterations, | |
636 'processes': self.processes, | |
637 'threads': self.threads} | |
638 | |
639 warmup_url = self.bucket_url.Clone() | |
640 warmup_url.object_name = os.path.basename(self.tcp_warmup_file) | |
641 warmup_target = StorageUrlToUploadObjectMetadata(warmup_url) | |
642 self.test_object_names.add(warmup_url.object_name) | |
643 | |
644 thru_url = self.bucket_url.Clone() | |
645 thru_url.object_name = os.path.basename(self.thru_local_file) | |
646 thru_target = StorageUrlToUploadObjectMetadata(thru_url) | |
647 thru_tuples = [] | |
648 for i in xrange(self.num_iterations): | |
649 # Create a unique name for each uploaded object. Otherwise, | |
650 # the XML API would fail when trying to non-atomically get metadata | |
651 # for the object that gets blown away by the overwrite. | |
652 remote_object_name = thru_target.name + str(i) | |
653 self.test_object_names.add(remote_object_name) | |
654 thru_tuples.append(UploadObjectTuple(thru_target.bucket, | |
655 remote_object_name, | |
656 filepath=self.thru_local_file)) | |
657 | |
658 if self.processes == 1 and self.threads == 1: | |
659 # Warm up the TCP connection. | |
660 def _Warmup(): | |
661 self.gsutil_api.UploadObject( | |
662 cStringIO.StringIO(self.file_contents[self.tcp_warmup_file]), | |
663 warmup_target, provider=self.provider, size=self.thru_filesize, | |
664 fields=['name']) | |
665 self._RunOperation(_Warmup) | |
666 | |
667 times = [] | |
668 | |
669 for i in xrange(self.num_iterations): | |
670 thru_tuple = thru_tuples[i] | |
671 def _Upload(): | |
672 """Uploads the write throughput measurement object.""" | |
673 upload_target = apitools_messages.Object( | |
674 bucket=thru_tuple.bucket_name, name=thru_tuple.object_name, | |
675 md5Hash=thru_tuple.md5) | |
676 io_fp = cStringIO.StringIO(self.file_contents[self.thru_local_file]) | |
677 t0 = time.time() | |
678 if self.thru_filesize < ResumableThreshold(): | |
679 self.gsutil_api.UploadObject( | |
680 io_fp, upload_target, provider=self.provider, | |
681 size=self.thru_filesize, fields=['name']) | |
682 else: | |
683 self.gsutil_api.UploadObjectResumable( | |
684 io_fp, upload_target, provider=self.provider, | |
685 size=self.thru_filesize, fields=['name'], | |
686 tracker_callback=_DummyTrackerCallback) | |
687 | |
688 t1 = time.time() | |
689 times.append(t1 - t0) | |
690 | |
691 self._RunOperation(_Upload) | |
692 time_took = sum(times) | |
693 | |
694 else: | |
695 args = thru_tuples | |
696 t0 = time.time() | |
697 self.Apply(_UploadWrapper, | |
698 args, | |
699 _PerfdiagExceptionHandler, | |
700 arg_checker=DummyArgChecker, | |
701 parallel_operations_override=True, | |
702 process_count=self.processes, | |
703 thread_count=self.threads) | |
704 t1 = time.time() | |
705 time_took = t1 - t0 | |
706 | |
707 total_bytes_copied = self.thru_filesize * self.num_iterations | |
708 bytes_per_second = total_bytes_copied / time_took | |
709 | |
710 self.results['write_throughput']['time_took'] = time_took | |
711 self.results['write_throughput']['total_bytes_copied'] = total_bytes_copied | |
712 self.results['write_throughput']['bytes_per_second'] = bytes_per_second | |
713 | |
714 def _RunListTests(self): | |
715 """Runs eventual consistency listing latency tests.""" | |
716 self.results['listing'] = {'num_files': self.num_iterations} | |
717 | |
718 # Generate N random object names to put in the bucket. | |
719 list_prefix = 'gsutil-perfdiag-list-' | |
720 list_objects = [] | |
721 for _ in xrange(self.num_iterations): | |
722 list_object_name = u'%s%s' % (list_prefix, os.urandom(20).encode('hex')) | |
723 self.test_object_names.add(list_object_name) | |
724 list_objects.append(list_object_name) | |
725 | |
726 # Add the objects to the bucket. | |
727 self.logger.info( | |
728 '\nWriting %s objects for listing test...', self.num_iterations) | |
729 empty_md5 = CalculateB64EncodedMd5FromContents(cStringIO.StringIO('')) | |
730 args = [ | |
731 UploadObjectTuple(self.bucket_url.bucket_name, name, md5=empty_md5, | |
732 contents='') for name in list_objects] | |
733 self.Apply(_UploadWrapper, args, _PerfdiagExceptionHandler, | |
734 arg_checker=DummyArgChecker) | |
735 | |
736 list_latencies = [] | |
737 files_seen = [] | |
738 total_start_time = time.time() | |
739 expected_objects = set(list_objects) | |
740 found_objects = set() | |
741 | |
742 def _List(): | |
743 """Lists and returns objects in the bucket. Also records latency.""" | |
744 t0 = time.time() | |
745 objects = list(self.gsutil_api.ListObjects( | |
746 self.bucket_url.bucket_name, prefix=list_prefix, delimiter='/', | |
747 provider=self.provider, fields=['items/name'])) | |
748 t1 = time.time() | |
749 list_latencies.append(t1 - t0) | |
750 return set([obj.data.name for obj in objects]) | |
751 | |
752 self.logger.info( | |
753 'Listing bucket %s waiting for %s objects to appear...', | |
754 self.bucket_url.bucket_name, self.num_iterations) | |
755 while expected_objects - found_objects: | |
756 def _ListAfterUpload(): | |
757 names = _List() | |
758 found_objects.update(names & expected_objects) | |
759 files_seen.append(len(found_objects)) | |
760 self._RunOperation(_ListAfterUpload) | |
761 if expected_objects - found_objects: | |
762 if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME: | |
763 self.logger.warning('Maximum time reached waiting for listing.') | |
764 break | |
765 total_end_time = time.time() | |
766 | |
767 self.results['listing']['insert'] = { | |
768 'num_listing_calls': len(list_latencies), | |
769 'list_latencies': list_latencies, | |
770 'files_seen_after_listing': files_seen, | |
771 'time_took': total_end_time - total_start_time, | |
772 } | |
773 | |
774 self.logger.info( | |
775 'Deleting %s objects for listing test...', self.num_iterations) | |
776 self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler, | |
777 arg_checker=DummyArgChecker) | |
778 | |
779 self.logger.info( | |
780 'Listing bucket %s waiting for %s objects to disappear...', | |
781 self.bucket_url.bucket_name, self.num_iterations) | |
782 list_latencies = [] | |
783 files_seen = [] | |
784 total_start_time = time.time() | |
785 found_objects = set(list_objects) | |
786 while found_objects: | |
787 def _ListAfterDelete(): | |
788 names = _List() | |
789 found_objects.intersection_update(names) | |
790 files_seen.append(len(found_objects)) | |
791 self._RunOperation(_ListAfterDelete) | |
792 if found_objects: | |
793 if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME: | |
794 self.logger.warning('Maximum time reached waiting for listing.') | |
795 break | |
796 total_end_time = time.time() | |
797 | |
798 self.results['listing']['delete'] = { | |
799 'num_listing_calls': len(list_latencies), | |
800 'list_latencies': list_latencies, | |
801 'files_seen_after_listing': files_seen, | |
802 'time_took': total_end_time - total_start_time, | |
803 } | |
804 | |
805 def Upload(self, thru_tuple, thread_state=None): | |
806 gsutil_api = GetCloudApiInstance(self, thread_state) | |
807 | |
808 md5hash = thru_tuple.md5 | |
809 contents = thru_tuple.contents | |
810 if thru_tuple.filepath: | |
811 md5hash = self.file_md5s[thru_tuple.filepath] | |
812 contents = self.file_contents[thru_tuple.filepath] | |
813 | |
814 upload_target = apitools_messages.Object( | |
815 bucket=thru_tuple.bucket_name, name=thru_tuple.object_name, | |
816 md5Hash=md5hash) | |
817 file_size = len(contents) | |
818 if file_size < ResumableThreshold(): | |
819 gsutil_api.UploadObject( | |
820 cStringIO.StringIO(contents), upload_target, | |
821 provider=self.provider, size=file_size, fields=['name']) | |
822 else: | |
823 gsutil_api.UploadObjectResumable( | |
824 cStringIO.StringIO(contents), upload_target, | |
825 provider=self.provider, size=file_size, fields=['name'], | |
826 tracker_callback=_DummyTrackerCallback) | |
827 | |
828 def Download(self, download_tuple, thread_state=None): | |
829 """Downloads a file. | |
830 | |
831 Args: | |
832 download_tuple: (bucket name, object name, serialization data for object). | |
833 thread_state: gsutil Cloud API instance to use for the download. | |
834 """ | |
835 gsutil_api = GetCloudApiInstance(self, thread_state) | |
836 gsutil_api.GetObjectMedia( | |
837 download_tuple[0], download_tuple[1], self.discard_sink, | |
838 provider=self.provider, serialization_data=download_tuple[2]) | |
839 | |
840 def Delete(self, thru_tuple, thread_state=None): | |
841 gsutil_api = thread_state or self.gsutil_api | |
842 gsutil_api.DeleteObject( | |
843 thru_tuple.bucket_name, thru_tuple.object_name, provider=self.provider) | |
844 | |
845 def _GetDiskCounters(self): | |
846 """Retrieves disk I/O statistics for all disks. | |
847 | |
848 Adapted from the psutil module's psutil._pslinux.disk_io_counters: | |
849 http://code.google.com/p/psutil/source/browse/trunk/psutil/_pslinux.py | |
850 | |
851 Originally distributed under under a BSD license. | |
852 Original Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola. | |
853 | |
854 Returns: | |
855 A dictionary containing disk names mapped to the disk counters from | |
856 /disk/diskstats. | |
857 """ | |
858 # iostat documentation states that sectors are equivalent with blocks and | |
859 # have a size of 512 bytes since 2.4 kernels. This value is needed to | |
860 # calculate the amount of disk I/O in bytes. | |
861 sector_size = 512 | |
862 | |
863 partitions = [] | |
864 with open('/proc/partitions', 'r') as f: | |
865 lines = f.readlines()[2:] | |
866 for line in lines: | |
867 _, _, _, name = line.split() | |
868 if name[-1].isdigit(): | |
869 partitions.append(name) | |
870 | |
871 retdict = {} | |
872 with open('/proc/diskstats', 'r') as f: | |
873 for line in f: | |
874 values = line.split()[:11] | |
875 _, _, name, reads, _, rbytes, rtime, writes, _, wbytes, wtime = values | |
876 if name in partitions: | |
877 rbytes = int(rbytes) * sector_size | |
878 wbytes = int(wbytes) * sector_size | |
879 reads = int(reads) | |
880 writes = int(writes) | |
881 rtime = int(rtime) | |
882 wtime = int(wtime) | |
883 retdict[name] = (reads, writes, rbytes, wbytes, rtime, wtime) | |
884 return retdict | |
885 | |
886 def _GetTcpStats(self): | |
887 """Tries to parse out TCP packet information from netstat output. | |
888 | |
889 Returns: | |
890 A dictionary containing TCP information, or None if netstat is not | |
891 available. | |
892 """ | |
893 # netstat return code is non-zero for -s on Linux, so don't raise on error. | |
894 try: | |
895 netstat_output = self._Exec(['netstat', '-s'], return_output=True, | |
896 raise_on_error=False) | |
897 except OSError: | |
898 self.logger.warning('netstat not found on your system; some measurement ' | |
899 'data will be missing') | |
900 return None | |
901 netstat_output = netstat_output.strip().lower() | |
902 found_tcp = False | |
903 tcp_retransmit = None | |
904 tcp_received = None | |
905 tcp_sent = None | |
906 for line in netstat_output.split('\n'): | |
907 # Header for TCP section is "Tcp:" in Linux/Mac and | |
908 # "TCP Statistics for" in Windows. | |
909 if 'tcp:' in line or 'tcp statistics' in line: | |
910 found_tcp = True | |
911 | |
912 # Linux == "segments retransmited" (sic), Mac == "retransmit timeouts" | |
913 # Windows == "segments retransmitted". | |
914 if (found_tcp and tcp_retransmit is None and | |
915 ('segments retransmited' in line or 'retransmit timeouts' in line or | |
916 'segments retransmitted' in line)): | |
917 tcp_retransmit = ''.join(c for c in line if c in string.digits) | |
918 | |
919 # Linux+Windows == "segments received", Mac == "packets received". | |
920 if (found_tcp and tcp_received is None and | |
921 ('segments received' in line or 'packets received' in line)): | |
922 tcp_received = ''.join(c for c in line if c in string.digits) | |
923 | |
924 # Linux == "segments send out" (sic), Mac+Windows == "packets sent". | |
925 if (found_tcp and tcp_sent is None and | |
926 ('segments send out' in line or 'packets sent' in line or | |
927 'segments sent' in line)): | |
928 tcp_sent = ''.join(c for c in line if c in string.digits) | |
929 | |
930 result = {} | |
931 try: | |
932 result['tcp_retransmit'] = int(tcp_retransmit) | |
933 result['tcp_received'] = int(tcp_received) | |
934 result['tcp_sent'] = int(tcp_sent) | |
935 except (ValueError, TypeError): | |
936 result['tcp_retransmit'] = None | |
937 result['tcp_received'] = None | |
938 result['tcp_sent'] = None | |
939 | |
940 return result | |
941 | |
942 def _CollectSysInfo(self): | |
943 """Collects system information.""" | |
944 sysinfo = {} | |
945 | |
946 # All exceptions that might be raised from socket module calls. | |
947 socket_errors = ( | |
948 socket.error, socket.herror, socket.gaierror, socket.timeout) | |
949 | |
950 # Find out whether HTTPS is enabled in Boto. | |
951 sysinfo['boto_https_enabled'] = boto.config.get('Boto', 'is_secure', True) | |
952 | |
953 # Look up proxy info. | |
954 proxy_host = boto.config.get('Boto', 'proxy', None) | |
955 proxy_port = boto.config.getint('Boto', 'proxy_port', 0) | |
956 sysinfo['using_proxy'] = bool(proxy_host) | |
957 | |
958 if boto.config.get('Boto', 'proxy_rdns', False): | |
959 self.logger.info('DNS lookups are disallowed in this environment, so ' | |
960 'some information is not included in this perfdiag run.') | |
961 | |
962 # Get the local IP address from socket lib. | |
963 try: | |
964 sysinfo['ip_address'] = socket.gethostbyname(socket.gethostname()) | |
965 except socket_errors: | |
966 sysinfo['ip_address'] = '' | |
967 # Record the temporary directory used since it can affect performance, e.g. | |
968 # when on a networked filesystem. | |
969 sysinfo['tempdir'] = tempfile.gettempdir() | |
970 | |
971 # Produces an RFC 2822 compliant GMT timestamp. | |
972 sysinfo['gmt_timestamp'] = time.strftime('%a, %d %b %Y %H:%M:%S +0000', | |
973 time.gmtime()) | |
974 | |
975 # Execute a CNAME lookup on Google DNS to find what Google server | |
976 # it's routing to. | |
977 cmd = ['nslookup', '-type=CNAME', self.XML_API_HOST] | |
978 try: | |
979 nslookup_cname_output = self._Exec(cmd, return_output=True) | |
980 m = re.search(r' = (?P<googserv>[^.]+)\.', nslookup_cname_output) | |
981 sysinfo['googserv_route'] = m.group('googserv') if m else None | |
982 except (CommandException, OSError): | |
983 sysinfo['googserv_route'] = '' | |
984 | |
985 # Try to determine the latency of a DNS lookup for the Google hostname | |
986 # endpoint. Note: we don't piggyback on gethostbyname_ex below because | |
987 # the _ex version requires an extra RTT. | |
988 try: | |
989 t0 = time.time() | |
990 socket.gethostbyname(self.XML_API_HOST) | |
991 t1 = time.time() | |
992 sysinfo['google_host_dns_latency'] = t1 - t0 | |
993 except socket_errors: | |
994 pass | |
995 | |
996 # Look up IP addresses for Google Server. | |
997 try: | |
998 (hostname, _, ipaddrlist) = socket.gethostbyname_ex(self.XML_API_HOST) | |
999 sysinfo['googserv_ips'] = ipaddrlist | |
1000 except socket_errors: | |
1001 ipaddrlist = [] | |
1002 sysinfo['googserv_ips'] = [] | |
1003 | |
1004 # Reverse lookup the hostnames for the Google Server IPs. | |
1005 sysinfo['googserv_hostnames'] = [] | |
1006 for googserv_ip in ipaddrlist: | |
1007 try: | |
1008 (hostname, _, ipaddrlist) = socket.gethostbyaddr(googserv_ip) | |
1009 sysinfo['googserv_hostnames'].append(hostname) | |
1010 except socket_errors: | |
1011 pass | |
1012 | |
1013 # Query o-o to find out what the Google DNS thinks is the user's IP. | |
1014 try: | |
1015 cmd = ['nslookup', '-type=TXT', 'o-o.myaddr.google.com.'] | |
1016 nslookup_txt_output = self._Exec(cmd, return_output=True) | |
1017 m = re.search(r'text\s+=\s+"(?P<dnsip>[\.\d]+)"', nslookup_txt_output) | |
1018 sysinfo['dns_o-o_ip'] = m.group('dnsip') if m else None | |
1019 except (CommandException, OSError): | |
1020 sysinfo['dns_o-o_ip'] = '' | |
1021 | |
1022 # Try to determine the latency of connecting to the Google hostname | |
1023 # endpoint. | |
1024 sysinfo['google_host_connect_latencies'] = {} | |
1025 for googserv_ip in ipaddrlist: | |
1026 try: | |
1027 sock = socket.socket() | |
1028 t0 = time.time() | |
1029 sock.connect((googserv_ip, self.XML_API_PORT)) | |
1030 t1 = time.time() | |
1031 sysinfo['google_host_connect_latencies'][googserv_ip] = t1 - t0 | |
1032 except socket_errors: | |
1033 pass | |
1034 | |
1035 # If using a proxy, try to determine the latency of a DNS lookup to resolve | |
1036 # the proxy hostname and the latency of connecting to the proxy. | |
1037 if proxy_host: | |
1038 proxy_ip = None | |
1039 try: | |
1040 t0 = time.time() | |
1041 proxy_ip = socket.gethostbyname(proxy_host) | |
1042 t1 = time.time() | |
1043 sysinfo['proxy_dns_latency'] = t1 - t0 | |
1044 except socket_errors: | |
1045 pass | |
1046 | |
1047 try: | |
1048 sock = socket.socket() | |
1049 t0 = time.time() | |
1050 sock.connect((proxy_ip or proxy_host, proxy_port)) | |
1051 t1 = time.time() | |
1052 sysinfo['proxy_host_connect_latency'] = t1 - t0 | |
1053 except socket_errors: | |
1054 pass | |
1055 | |
1056 # Try and find the number of CPUs in the system if available. | |
1057 try: | |
1058 sysinfo['cpu_count'] = multiprocessing.cpu_count() | |
1059 except NotImplementedError: | |
1060 sysinfo['cpu_count'] = None | |
1061 | |
1062 # For *nix platforms, obtain the CPU load. | |
1063 try: | |
1064 sysinfo['load_avg'] = list(os.getloadavg()) | |
1065 except (AttributeError, OSError): | |
1066 sysinfo['load_avg'] = None | |
1067 | |
1068 # Try and collect memory information from /proc/meminfo if possible. | |
1069 mem_total = None | |
1070 mem_free = None | |
1071 mem_buffers = None | |
1072 mem_cached = None | |
1073 | |
1074 try: | |
1075 with open('/proc/meminfo', 'r') as f: | |
1076 for line in f: | |
1077 if line.startswith('MemTotal'): | |
1078 mem_total = (int(''.join(c for c in line if c in string.digits)) | |
1079 * 1000) | |
1080 elif line.startswith('MemFree'): | |
1081 mem_free = (int(''.join(c for c in line if c in string.digits)) | |
1082 * 1000) | |
1083 elif line.startswith('Buffers'): | |
1084 mem_buffers = (int(''.join(c for c in line if c in string.digits)) | |
1085 * 1000) | |
1086 elif line.startswith('Cached'): | |
1087 mem_cached = (int(''.join(c for c in line if c in string.digits)) | |
1088 * 1000) | |
1089 except (IOError, ValueError): | |
1090 pass | |
1091 | |
1092 sysinfo['meminfo'] = {'mem_total': mem_total, | |
1093 'mem_free': mem_free, | |
1094 'mem_buffers': mem_buffers, | |
1095 'mem_cached': mem_cached} | |
1096 | |
1097 # Get configuration attributes from config module. | |
1098 sysinfo['gsutil_config'] = {} | |
1099 for attr in dir(config): | |
1100 attr_value = getattr(config, attr) | |
1101 # Filter out multiline strings that are not useful. | |
1102 if attr.isupper() and not (isinstance(attr_value, basestring) and | |
1103 '\n' in attr_value): | |
1104 sysinfo['gsutil_config'][attr] = attr_value | |
1105 | |
1106 sysinfo['tcp_proc_values'] = {} | |
1107 stats_to_check = [ | |
1108 '/proc/sys/net/core/rmem_default', | |
1109 '/proc/sys/net/core/rmem_max', | |
1110 '/proc/sys/net/core/wmem_default', | |
1111 '/proc/sys/net/core/wmem_max', | |
1112 '/proc/sys/net/ipv4/tcp_timestamps', | |
1113 '/proc/sys/net/ipv4/tcp_sack', | |
1114 '/proc/sys/net/ipv4/tcp_window_scaling', | |
1115 ] | |
1116 for fname in stats_to_check: | |
1117 try: | |
1118 with open(fname, 'r') as f: | |
1119 value = f.read() | |
1120 sysinfo['tcp_proc_values'][os.path.basename(fname)] = value.strip() | |
1121 except IOError: | |
1122 pass | |
1123 | |
1124 self.results['sysinfo'] = sysinfo | |
1125 | |
1126 def _DisplayStats(self, trials): | |
1127 """Prints out mean, standard deviation, median, and 90th percentile.""" | |
1128 n = len(trials) | |
1129 mean = float(sum(trials)) / n | |
1130 stdev = math.sqrt(sum((x - mean)**2 for x in trials) / n) | |
1131 | |
1132 print str(n).rjust(6), '', | |
1133 print ('%.1f' % (mean * 1000)).rjust(9), '', | |
1134 print ('%.1f' % (stdev * 1000)).rjust(12), '', | |
1135 print ('%.1f' % (Percentile(trials, 0.5) * 1000)).rjust(11), '', | |
1136 print ('%.1f' % (Percentile(trials, 0.9) * 1000)).rjust(11), '' | |
1137 | |
1138 def _DisplayResults(self): | |
1139 """Displays results collected from diagnostic run.""" | |
1140 print | |
1141 print '=' * 78 | |
1142 print 'DIAGNOSTIC RESULTS'.center(78) | |
1143 print '=' * 78 | |
1144 | |
1145 if 'latency' in self.results: | |
1146 print | |
1147 print '-' * 78 | |
1148 print 'Latency'.center(78) | |
1149 print '-' * 78 | |
1150 print ('Operation Size Trials Mean (ms) Std Dev (ms) ' | |
1151 'Median (ms) 90th % (ms)') | |
1152 print ('========= ========= ====== ========= ============ ' | |
1153 '=========== ===========') | |
1154 for key in sorted(self.results['latency']): | |
1155 trials = sorted(self.results['latency'][key]) | |
1156 op, numbytes = key.split('_') | |
1157 numbytes = int(numbytes) | |
1158 if op == 'METADATA': | |
1159 print 'Metadata'.rjust(9), '', | |
1160 print MakeHumanReadable(numbytes).rjust(9), '', | |
1161 self._DisplayStats(trials) | |
1162 if op == 'DOWNLOAD': | |
1163 print 'Download'.rjust(9), '', | |
1164 print MakeHumanReadable(numbytes).rjust(9), '', | |
1165 self._DisplayStats(trials) | |
1166 if op == 'UPLOAD': | |
1167 print 'Upload'.rjust(9), '', | |
1168 print MakeHumanReadable(numbytes).rjust(9), '', | |
1169 self._DisplayStats(trials) | |
1170 if op == 'DELETE': | |
1171 print 'Delete'.rjust(9), '', | |
1172 print MakeHumanReadable(numbytes).rjust(9), '', | |
1173 self._DisplayStats(trials) | |
1174 | |
1175 if 'write_throughput' in self.results: | |
1176 print | |
1177 print '-' * 78 | |
1178 print 'Write Throughput'.center(78) | |
1179 print '-' * 78 | |
1180 write_thru = self.results['write_throughput'] | |
1181 print 'Copied a %s file %d times for a total transfer size of %s.' % ( | |
1182 MakeHumanReadable(write_thru['file_size']), | |
1183 write_thru['num_copies'], | |
1184 MakeHumanReadable(write_thru['total_bytes_copied'])) | |
1185 print 'Write throughput: %s/s.' % ( | |
1186 MakeBitsHumanReadable(write_thru['bytes_per_second'] * 8)) | |
1187 | |
1188 if 'read_throughput' in self.results: | |
1189 print | |
1190 print '-' * 78 | |
1191 print 'Read Throughput'.center(78) | |
1192 print '-' * 78 | |
1193 read_thru = self.results['read_throughput'] | |
1194 print 'Copied a %s file %d times for a total transfer size of %s.' % ( | |
1195 MakeHumanReadable(read_thru['file_size']), | |
1196 read_thru['num_times'], | |
1197 MakeHumanReadable(read_thru['total_bytes_copied'])) | |
1198 print 'Read throughput: %s/s.' % ( | |
1199 MakeBitsHumanReadable(read_thru['bytes_per_second'] * 8)) | |
1200 | |
1201 if 'listing' in self.results: | |
1202 print | |
1203 print '-' * 78 | |
1204 print 'Listing'.center(78) | |
1205 print '-' * 78 | |
1206 | |
1207 listing = self.results['listing'] | |
1208 insert = listing['insert'] | |
1209 delete = listing['delete'] | |
1210 print 'After inserting %s objects:' % listing['num_files'] | |
1211 print (' Total time for objects to appear: %.2g seconds' % | |
1212 insert['time_took']) | |
1213 print ' Number of listing calls made: %s' % insert['num_listing_calls'] | |
1214 print (' Individual listing call latencies: [%s]' % | |
1215 ', '.join('%.2gs' % lat for lat in insert['list_latencies'])) | |
1216 print (' Files reflected after each call: [%s]' % | |
1217 ', '.join(map(str, insert['files_seen_after_listing']))) | |
1218 | |
1219 print 'After deleting %s objects:' % listing['num_files'] | |
1220 print (' Total time for objects to appear: %.2g seconds' % | |
1221 delete['time_took']) | |
1222 print ' Number of listing calls made: %s' % delete['num_listing_calls'] | |
1223 print (' Individual listing call latencies: [%s]' % | |
1224 ', '.join('%.2gs' % lat for lat in delete['list_latencies'])) | |
1225 print (' Files reflected after each call: [%s]' % | |
1226 ', '.join(map(str, delete['files_seen_after_listing']))) | |
1227 | |
1228 if 'sysinfo' in self.results: | |
1229 print | |
1230 print '-' * 78 | |
1231 print 'System Information'.center(78) | |
1232 print '-' * 78 | |
1233 info = self.results['sysinfo'] | |
1234 print 'IP Address: \n %s' % info['ip_address'] | |
1235 print 'Temporary Directory: \n %s' % info['tempdir'] | |
1236 print 'Bucket URI: \n %s' % self.results['bucket_uri'] | |
1237 print 'gsutil Version: \n %s' % self.results.get('gsutil_version', | |
1238 'Unknown') | |
1239 print 'boto Version: \n %s' % self.results.get('boto_version', 'Unknown') | |
1240 | |
1241 if 'gmt_timestamp' in info: | |
1242 ts_string = info['gmt_timestamp'] | |
1243 timetuple = None | |
1244 try: | |
1245 # Convert RFC 2822 string to Linux timestamp. | |
1246 timetuple = time.strptime(ts_string, '%a, %d %b %Y %H:%M:%S +0000') | |
1247 except ValueError: | |
1248 pass | |
1249 | |
1250 if timetuple: | |
1251 # Converts the GMT time tuple to local Linux timestamp. | |
1252 localtime = calendar.timegm(timetuple) | |
1253 localdt = datetime.datetime.fromtimestamp(localtime) | |
1254 print 'Measurement time: \n %s' % localdt.strftime( | |
1255 '%Y-%m-%d %I:%M:%S %p %Z') | |
1256 | |
1257 print 'Google Server: \n %s' % info['googserv_route'] | |
1258 print ('Google Server IP Addresses: \n %s' % | |
1259 ('\n '.join(info['googserv_ips']))) | |
1260 print ('Google Server Hostnames: \n %s' % | |
1261 ('\n '.join(info['googserv_hostnames']))) | |
1262 print 'Google DNS thinks your IP is: \n %s' % info['dns_o-o_ip'] | |
1263 print 'CPU Count: \n %s' % info['cpu_count'] | |
1264 print 'CPU Load Average: \n %s' % info['load_avg'] | |
1265 try: | |
1266 print ('Total Memory: \n %s' % | |
1267 MakeHumanReadable(info['meminfo']['mem_total'])) | |
1268 # Free memory is really MemFree + Buffers + Cached. | |
1269 print 'Free Memory: \n %s' % MakeHumanReadable( | |
1270 info['meminfo']['mem_free'] + | |
1271 info['meminfo']['mem_buffers'] + | |
1272 info['meminfo']['mem_cached']) | |
1273 except TypeError: | |
1274 pass | |
1275 | |
1276 if 'netstat_end' in info and 'netstat_start' in info: | |
1277 netstat_after = info['netstat_end'] | |
1278 netstat_before = info['netstat_start'] | |
1279 for tcp_type in ('sent', 'received', 'retransmit'): | |
1280 try: | |
1281 delta = (netstat_after['tcp_%s' % tcp_type] - | |
1282 netstat_before['tcp_%s' % tcp_type]) | |
1283 print 'TCP segments %s during test:\n %d' % (tcp_type, delta) | |
1284 except TypeError: | |
1285 pass | |
1286 else: | |
1287 print ('TCP segment counts not available because "netstat" was not ' | |
1288 'found during test runs') | |
1289 | |
1290 if 'disk_counters_end' in info and 'disk_counters_start' in info: | |
1291 print 'Disk Counter Deltas:\n', | |
1292 disk_after = info['disk_counters_end'] | |
1293 disk_before = info['disk_counters_start'] | |
1294 print '', 'disk'.rjust(6), | |
1295 for colname in ['reads', 'writes', 'rbytes', 'wbytes', 'rtime', | |
1296 'wtime']: | |
1297 print colname.rjust(8), | |
1298 print | |
1299 for diskname in sorted(disk_after): | |
1300 before = disk_before[diskname] | |
1301 after = disk_after[diskname] | |
1302 (reads1, writes1, rbytes1, wbytes1, rtime1, wtime1) = before | |
1303 (reads2, writes2, rbytes2, wbytes2, rtime2, wtime2) = after | |
1304 print '', diskname.rjust(6), | |
1305 deltas = [reads2-reads1, writes2-writes1, rbytes2-rbytes1, | |
1306 wbytes2-wbytes1, rtime2-rtime1, wtime2-wtime1] | |
1307 for delta in deltas: | |
1308 print str(delta).rjust(8), | |
1309 print | |
1310 | |
1311 if 'tcp_proc_values' in info: | |
1312 print 'TCP /proc values:\n', | |
1313 for item in info['tcp_proc_values'].iteritems(): | |
1314 print ' %s = %s' % item | |
1315 | |
1316 if 'boto_https_enabled' in info: | |
1317 print 'Boto HTTPS Enabled: \n %s' % info['boto_https_enabled'] | |
1318 | |
1319 if 'using_proxy' in info: | |
1320 print 'Requests routed through proxy: \n %s' % info['using_proxy'] | |
1321 | |
1322 if 'google_host_dns_latency' in info: | |
1323 print ('Latency of the DNS lookup for Google Storage server (ms): ' | |
1324 '\n %.1f' % (info['google_host_dns_latency'] * 1000.0)) | |
1325 | |
1326 if 'google_host_connect_latencies' in info: | |
1327 print 'Latencies connecting to Google Storage server IPs (ms):' | |
1328 for ip, latency in info['google_host_connect_latencies'].iteritems(): | |
1329 print ' %s = %.1f' % (ip, latency * 1000.0) | |
1330 | |
1331 if 'proxy_dns_latency' in info: | |
1332 print ('Latency of the DNS lookup for the configured proxy (ms): ' | |
1333 '\n %.1f' % (info['proxy_dns_latency'] * 1000.0)) | |
1334 | |
1335 if 'proxy_host_connect_latency' in info: | |
1336 print ('Latency connecting to the configured proxy (ms): \n %.1f' % | |
1337 (info['proxy_host_connect_latency'] * 1000.0)) | |
1338 | |
1339 if 'request_errors' in self.results and 'total_requests' in self.results: | |
1340 print | |
1341 print '-' * 78 | |
1342 print 'In-Process HTTP Statistics'.center(78) | |
1343 print '-' * 78 | |
1344 total = int(self.results['total_requests']) | |
1345 numerrors = int(self.results['request_errors']) | |
1346 numbreaks = int(self.results['connection_breaks']) | |
1347 availability = (((total - numerrors) / float(total)) * 100 | |
1348 if total > 0 else 100) | |
1349 print 'Total HTTP requests made: %d' % total | |
1350 print 'HTTP 5xx errors: %d' % numerrors | |
1351 print 'HTTP connections broken: %d' % numbreaks | |
1352 print 'Availability: %.7g%%' % availability | |
1353 if 'error_responses_by_code' in self.results: | |
1354 sorted_codes = sorted( | |
1355 self.results['error_responses_by_code'].iteritems()) | |
1356 if sorted_codes: | |
1357 print 'Error responses by code:' | |
1358 print '\n'.join(' %s: %s' % c for c in sorted_codes) | |
1359 | |
1360 if self.output_file: | |
1361 with open(self.output_file, 'w') as f: | |
1362 json.dump(self.results, f, indent=2) | |
1363 print | |
1364 print "Output file written to '%s'." % self.output_file | |
1365 | |
1366 print | |
1367 | |
1368 def _ParsePositiveInteger(self, val, msg): | |
1369 """Tries to convert val argument to a positive integer. | |
1370 | |
1371 Args: | |
1372 val: The value (as a string) to convert to a positive integer. | |
1373 msg: The error message to place in the CommandException on an error. | |
1374 | |
1375 Returns: | |
1376 A valid positive integer. | |
1377 | |
1378 Raises: | |
1379 CommandException: If the supplied value is not a valid positive integer. | |
1380 """ | |
1381 try: | |
1382 val = int(val) | |
1383 if val < 1: | |
1384 raise CommandException(msg) | |
1385 return val | |
1386 except ValueError: | |
1387 raise CommandException(msg) | |
1388 | |
1389 def _ParseArgs(self): | |
1390 """Parses arguments for perfdiag command.""" | |
1391 # From -n. | |
1392 self.num_iterations = 5 | |
1393 # From -c. | |
1394 self.processes = 1 | |
1395 # From -k. | |
1396 self.threads = 1 | |
1397 # From -s. | |
1398 self.thru_filesize = 1048576 | |
1399 # From -t. | |
1400 self.diag_tests = self.DEFAULT_DIAG_TESTS | |
1401 # From -o. | |
1402 self.output_file = None | |
1403 # From -i. | |
1404 self.input_file = None | |
1405 # From -m. | |
1406 self.metadata_keys = {} | |
1407 | |
1408 if self.sub_opts: | |
1409 for o, a in self.sub_opts: | |
1410 if o == '-n': | |
1411 self.num_iterations = self._ParsePositiveInteger( | |
1412 a, 'The -n parameter must be a positive integer.') | |
1413 if o == '-c': | |
1414 self.processes = self._ParsePositiveInteger( | |
1415 a, 'The -c parameter must be a positive integer.') | |
1416 if o == '-k': | |
1417 self.threads = self._ParsePositiveInteger( | |
1418 a, 'The -k parameter must be a positive integer.') | |
1419 if o == '-s': | |
1420 try: | |
1421 self.thru_filesize = HumanReadableToBytes(a) | |
1422 except ValueError: | |
1423 raise CommandException('Invalid -s parameter.') | |
1424 if self.thru_filesize > (20 * 1024 ** 3): # Max 20 GiB. | |
1425 raise CommandException( | |
1426 'Maximum throughput file size parameter (-s) is 20 GiB.') | |
1427 if o == '-t': | |
1428 self.diag_tests = [] | |
1429 for test_name in a.strip().split(','): | |
1430 if test_name.lower() not in self.ALL_DIAG_TESTS: | |
1431 raise CommandException("List of test names (-t) contains invalid " | |
1432 "test name '%s'." % test_name) | |
1433 self.diag_tests.append(test_name) | |
1434 if o == '-m': | |
1435 pieces = a.split(':') | |
1436 if len(pieces) != 2: | |
1437 raise CommandException( | |
1438 "Invalid metadata key-value combination '%s'." % a) | |
1439 key, value = pieces | |
1440 self.metadata_keys[key] = value | |
1441 if o == '-o': | |
1442 self.output_file = os.path.abspath(a) | |
1443 if o == '-i': | |
1444 self.input_file = os.path.abspath(a) | |
1445 if not os.path.isfile(self.input_file): | |
1446 raise CommandException("Invalid input file (-i): '%s'." % a) | |
1447 try: | |
1448 with open(self.input_file, 'r') as f: | |
1449 self.results = json.load(f) | |
1450 self.logger.info("Read input file: '%s'.", self.input_file) | |
1451 except ValueError: | |
1452 raise CommandException("Could not decode input file (-i): '%s'." % | |
1453 a) | |
1454 return | |
1455 if not self.args: | |
1456 self.RaiseWrongNumberOfArgumentsException() | |
1457 | |
1458 self.bucket_url = StorageUrlFromString(self.args[0]) | |
1459 self.provider = self.bucket_url.scheme | |
1460 if not (self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket()): | |
1461 raise CommandException('The perfdiag command requires a URL that ' | |
1462 'specifies a bucket.\n"%s" is not ' | |
1463 'valid.' % self.args[0]) | |
1464 # Ensure the bucket exists. | |
1465 self.gsutil_api.GetBucket(self.bucket_url.bucket_name, | |
1466 provider=self.bucket_url.scheme, | |
1467 fields=['id']) | |
1468 self.exceptions = [httplib.HTTPException, socket.error, socket.gaierror, | |
1469 socket.timeout, httplib.BadStatusLine, | |
1470 ServiceException] | |
1471 | |
1472 # Command entry point. | |
1473 def RunCommand(self): | |
1474 """Called by gsutil when the command is being invoked.""" | |
1475 self._ParseArgs() | |
1476 | |
1477 if self.input_file: | |
1478 self._DisplayResults() | |
1479 return 0 | |
1480 | |
1481 # We turn off retries in the underlying boto library because the | |
1482 # _RunOperation function handles errors manually so it can count them. | |
1483 boto.config.set('Boto', 'num_retries', '0') | |
1484 | |
1485 self.logger.info( | |
1486 'Number of iterations to run: %d\n' | |
1487 'Base bucket URI: %s\n' | |
1488 'Number of processes: %d\n' | |
1489 'Number of threads: %d\n' | |
1490 'Throughput file size: %s\n' | |
1491 'Diagnostics to run: %s', | |
1492 self.num_iterations, | |
1493 self.bucket_url, | |
1494 self.processes, | |
1495 self.threads, | |
1496 MakeHumanReadable(self.thru_filesize), | |
1497 (', '.join(self.diag_tests))) | |
1498 | |
1499 try: | |
1500 self._SetUp() | |
1501 | |
1502 # Collect generic system info. | |
1503 self._CollectSysInfo() | |
1504 # Collect netstat info and disk counters before tests (and again later). | |
1505 netstat_output = self._GetTcpStats() | |
1506 if netstat_output: | |
1507 self.results['sysinfo']['netstat_start'] = netstat_output | |
1508 if IS_LINUX: | |
1509 self.results['sysinfo']['disk_counters_start'] = self._GetDiskCounters() | |
1510 # Record bucket URL. | |
1511 self.results['bucket_uri'] = str(self.bucket_url) | |
1512 self.results['json_format'] = 'perfdiag' | |
1513 self.results['metadata'] = self.metadata_keys | |
1514 | |
1515 if self.LAT in self.diag_tests: | |
1516 self._RunLatencyTests() | |
1517 if self.RTHRU in self.diag_tests: | |
1518 self._RunReadThruTests() | |
1519 if self.WTHRU in self.diag_tests: | |
1520 self._RunWriteThruTests() | |
1521 if self.LIST in self.diag_tests: | |
1522 self._RunListTests() | |
1523 | |
1524 # Collect netstat info and disk counters after tests. | |
1525 netstat_output = self._GetTcpStats() | |
1526 if netstat_output: | |
1527 self.results['sysinfo']['netstat_end'] = netstat_output | |
1528 if IS_LINUX: | |
1529 self.results['sysinfo']['disk_counters_end'] = self._GetDiskCounters() | |
1530 | |
1531 self.results['total_requests'] = self.total_requests | |
1532 self.results['request_errors'] = self.request_errors | |
1533 self.results['error_responses_by_code'] = self.error_responses_by_code | |
1534 self.results['connection_breaks'] = self.connection_breaks | |
1535 self.results['gsutil_version'] = gslib.VERSION | |
1536 self.results['boto_version'] = boto.__version__ | |
1537 | |
1538 self._DisplayResults() | |
1539 finally: | |
1540 # TODO: Install signal handlers so this is performed in response to a | |
1541 # terminating signal; consider multi-threaded object deletes during | |
1542 # cleanup so it happens quickly. | |
1543 self._TearDown() | |
1544 | |
1545 return 0 | |
1546 | |
1547 | |
1548 class UploadObjectTuple(object): | |
1549 """Picklable tuple with necessary metadata for an insert object call.""" | |
1550 | |
1551 def __init__(self, bucket_name, object_name, filepath=None, md5=None, | |
1552 contents=None): | |
1553 """Create an upload tuple. | |
1554 | |
1555 Args: | |
1556 bucket_name: Name of the bucket to upload to. | |
1557 object_name: Name of the object to upload to. | |
1558 filepath: A file path located in self.file_contents and self.file_md5s. | |
1559 md5: The MD5 hash of the object being uploaded. | |
1560 contents: The contents of the file to be uploaded. | |
1561 | |
1562 Note: (contents + md5) and filepath are mutually exlusive. You may specify | |
1563 one or the other, but not both. | |
1564 Note: If one of contents or md5 are specified, they must both be specified. | |
1565 | |
1566 Raises: | |
1567 InvalidArgument: if the arguments are invalid. | |
1568 """ | |
1569 self.bucket_name = bucket_name | |
1570 self.object_name = object_name | |
1571 self.filepath = filepath | |
1572 self.md5 = md5 | |
1573 self.contents = contents | |
1574 if filepath and (md5 or contents is not None): | |
1575 raise InvalidArgument( | |
1576 'Only one of filepath or (md5 + contents) may be specified.') | |
1577 if not filepath and (not md5 or contents is None): | |
1578 raise InvalidArgument( | |
1579 'Both md5 and contents must be specified.') | |
1580 | |
1581 | |
1582 def StorageUrlToUploadObjectMetadata(storage_url): | |
1583 if storage_url.IsCloudUrl() and storage_url.IsObject(): | |
1584 upload_target = apitools_messages.Object() | |
1585 upload_target.name = storage_url.object_name | |
1586 upload_target.bucket = storage_url.bucket_name | |
1587 return upload_target | |
1588 else: | |
1589 raise CommandException('Non-cloud URL upload target %s was created in ' | |
1590 'perfdiag implemenation.' % storage_url) | |
OLD | NEW |