Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(18)

Side by Side Diff: third_party/gsutil/gslib/commands/perfdiag.py

Issue 1380943003: Roll version of gsutil to 4.15. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: rebase Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « third_party/gsutil/gslib/commands/mb.py ('k') | third_party/gsutil/gslib/commands/rb.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # -*- coding: utf-8 -*- 1 # -*- coding: utf-8 -*-
2 # Copyright 2012 Google Inc. All Rights Reserved. 2 # Copyright 2012 Google Inc. All Rights Reserved.
3 # 3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License. 5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at 6 # You may obtain a copy of the License at
7 # 7 #
8 # http://www.apache.org/licenses/LICENSE-2.0 8 # http://www.apache.org/licenses/LICENSE-2.0
9 # 9 #
10 # Unless required by applicable law or agreed to in writing, software 10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, 11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and 13 # See the License for the specific language governing permissions and
14 # limitations under the License. 14 # limitations under the License.
15 """Contains the perfdiag gsutil command.""" 15 """Contains the perfdiag gsutil command."""
16 16
17 from __future__ import absolute_import 17 from __future__ import absolute_import
18 18
19 import calendar 19 import calendar
20 from collections import defaultdict 20 from collections import defaultdict
21 from collections import namedtuple
21 import contextlib 22 import contextlib
22 import cStringIO 23 import cStringIO
23 import datetime 24 import datetime
24 import httplib 25 import httplib
25 import json 26 import json
26 import logging 27 import logging
27 import math 28 import math
28 import multiprocessing 29 import multiprocessing
29 import os 30 import os
30 import random 31 import random
31 import re 32 import re
32 import socket 33 import socket
33 import string 34 import string
34 import subprocess 35 import subprocess
35 import tempfile 36 import tempfile
36 import time 37 import time
37 38
38 import boto 39 import boto
39 import boto.gs.connection 40 import boto.gs.connection
40 41
41 import gslib 42 import gslib
42 from gslib.cloud_api import NotFoundException 43 from gslib.cloud_api import NotFoundException
43 from gslib.cloud_api import ServiceException 44 from gslib.cloud_api import ServiceException
44 from gslib.cloud_api_helper import GetDownloadSerializationDict 45 from gslib.cloud_api_helper import GetDownloadSerializationData
45 from gslib.command import Command 46 from gslib.command import Command
46 from gslib.command import DummyArgChecker 47 from gslib.command import DummyArgChecker
47 from gslib.command_argument import CommandArgument 48 from gslib.command_argument import CommandArgument
48 from gslib.commands import config 49 from gslib.commands import config
49 from gslib.cs_api_map import ApiSelector 50 from gslib.cs_api_map import ApiSelector
50 from gslib.exception import CommandException 51 from gslib.exception import CommandException
52 from gslib.file_part import FilePart
51 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents 53 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
52 from gslib.storage_url import StorageUrlFromString 54 from gslib.storage_url import StorageUrlFromString
53 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m essages 55 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m essages
56 from gslib.util import CheckFreeSpace
57 from gslib.util import DivideAndCeil
54 from gslib.util import GetCloudApiInstance 58 from gslib.util import GetCloudApiInstance
59 from gslib.util import GetFileSize
55 from gslib.util import GetMaxRetryDelay 60 from gslib.util import GetMaxRetryDelay
56 from gslib.util import HumanReadableToBytes 61 from gslib.util import HumanReadableToBytes
57 from gslib.util import IS_LINUX 62 from gslib.util import IS_LINUX
58 from gslib.util import MakeBitsHumanReadable 63 from gslib.util import MakeBitsHumanReadable
59 from gslib.util import MakeHumanReadable 64 from gslib.util import MakeHumanReadable
60 from gslib.util import Percentile 65 from gslib.util import Percentile
61 from gslib.util import ResumableThreshold 66 from gslib.util import ResumableThreshold
62 67
63
64 _SYNOPSIS = """ 68 _SYNOPSIS = """
65 gsutil perfdiag [-i in.json] 69 gsutil perfdiag [-i in.json]
66 gsutil perfdiag [-o out.json] [-n iterations] [-c processes] 70 gsutil perfdiag [-o out.json] [-n objects] [-c processes]
67 [-k threads] [-s size] [-t tests] url... 71 [-k threads] [-p parallelism type] [-y slices] [-s size] [-d directory]
72 [-t tests] url...
68 """ 73 """
69 74
70 _DETAILED_HELP_TEXT = (""" 75 _DETAILED_HELP_TEXT = ("""
71 <B>SYNOPSIS</B> 76 <B>SYNOPSIS</B>
72 """ + _SYNOPSIS + """ 77 """ + _SYNOPSIS + """
73 78
74 79
75 <B>DESCRIPTION</B> 80 <B>DESCRIPTION</B>
76 The perfdiag command runs a suite of diagnostic tests for a given Google 81 The perfdiag command runs a suite of diagnostic tests for a given Google
77 Storage bucket. 82 Storage bucket.
(...skipping 14 matching lines...) Expand all
92 97
93 <B>PROVIDING DIAGNOSTIC OUTPUT TO GOOGLE CLOUD STORAGE TEAM</B> 98 <B>PROVIDING DIAGNOSTIC OUTPUT TO GOOGLE CLOUD STORAGE TEAM</B>
94 If the Google Cloud Storage Team asks you to run a performance diagnostic 99 If the Google Cloud Storage Team asks you to run a performance diagnostic
95 please use the following command, and email the output file (output.json) 100 please use the following command, and email the output file (output.json)
96 to gs-team@google.com: 101 to gs-team@google.com:
97 102
98 gsutil perfdiag -o output.json gs://your-bucket 103 gsutil perfdiag -o output.json gs://your-bucket
99 104
100 105
101 <B>OPTIONS</B> 106 <B>OPTIONS</B>
102 -n Sets the number of iterations performed when downloading and 107 -n Sets the number of objects to use when downloading and uploading
103 uploading files during latency and throughput tests. Defaults to 108 files during tests. Defaults to 5.
104 5.
105 109
106 -c Sets the number of processes to use while running throughput 110 -c Sets the number of processes to use while running throughput
107 experiments. The default value is 1. 111 experiments. The default value is 1.
108 112
109 -k Sets the number of threads per process to use while running 113 -k Sets the number of threads per process to use while running
110 throughput experiments. Each process will receive an equal number 114 throughput experiments. Each process will receive an equal number
111 of threads. The default value is 1. 115 of threads. The default value is 1.
112 116
113 -s Sets the size (in bytes) of the test file used to perform read 117 Note: All specified threads and processes will be created, but may
114 and write throughput tests. The default is 1 MiB. This can also 118 not by saturated with work if too few objects (specified with -n)
115 be specified using byte suffixes such as 500K or 1M. Note: these 119 and too few components (specified with -y) are specified.
116 values are interpreted as multiples of 1024 (K=1024, M=1024*1024, 120
117 etc.) 121 -p Sets the type of parallelism to be used (only applicable when
122 threads or processes are specified and threads * processes > 1).
123 The default is to use fan. Must be one of the following:
124
125 fan
126 Use one thread per object. This is akin to using gsutil -m cp,
127 with sliced object download / parallel composite upload
128 disabled.
129
130 slice
131 Use Y (specified with -y) threads for each object, transferring
132 one object at a time. This is akin to using parallel object
133 download / parallel composite upload, without -m. Sliced
134 uploads not supported for s3.
135
136 both
137 Use Y (specified with -y) threads for each object, transferring
138 multiple objects at a time. This is akin to simultaneously
139 using sliced object download / parallel composite upload and
140 gsutil -m cp. Sliced uploads not supported for s3.
141
142 -y Sets the number of slices to divide each file/object into while
143 transferring data. Only applicable with the slice (or both)
144 parallelism type. The default is 4 slices.
145
146 -s Sets the size (in bytes) for each of the N (set with -n) objects
147 used in the read and write throughput tests. The default is 1 MiB.
148 This can also be specified using byte suffixes such as 500K or 1M.
149 Note: these values are interpreted as multiples of 1024 (K=1024,
150 M=1024*1024, etc.)
151 Note: If rthru_file or wthru_file are performed, N (set with -n)
152 times as much disk space as specified will be required for the
153 operation.
154
155 -d Sets the directory to store temporary local files in. If not
156 specified, a default temporary directory will be used.
118 157
119 -t Sets the list of diagnostic tests to perform. The default is to 158 -t Sets the list of diagnostic tests to perform. The default is to
120 run all diagnostic tests. Must be a comma-separated list 159 run the lat, rthru, and wthru diagnostic tests. Must be a
121 containing one or more of the following: 160 comma-separated list containing one or more of the following:
122 161
123 lat 162 lat
124 Runs N iterations (set with -n) of writing the file, 163 For N (set with -n) objects, write the object, retrieve its
125 retrieving its metadata, reading the file, and deleting 164 metadata, read the object, and finally delete the object.
126 the file. Records the latency of each operation. 165 Record the latency of each operation.
127 166
128 list 167 list
129 Write N (set with -n) objects to the bucket, record how long 168 Write N (set with -n) objects to the bucket, record how long
130 it takes for the eventually consistent listing call to return 169 it takes for the eventually consistent listing call to return
131 the N objects in its result, delete the N objects, then record 170 the N objects in its result, delete the N objects, then record
132 how long it takes listing to stop returning the N objects. 171 how long it takes listing to stop returning the N objects.
133 This test is off by default. 172 This test is off by default.
134 173
135 rthru 174 rthru
136 Runs N (set with -n) read operations, with at most C 175 Runs N (set with -n) read operations, with at most C
137 (set with -c) reads outstanding at any given time. 176 (set with -c) reads outstanding at any given time.
138 177
178 rthru_file
179 The same as rthru, but simultaneously writes data to the disk,
180 to gauge the performance impact of the local disk on downloads.
181
139 wthru 182 wthru
140 Runs N (set with -n) write operations, with at most C 183 Runs N (set with -n) write operations, with at most C
141 (set with -c) writes outstanding at any given time. 184 (set with -c) writes outstanding at any given time.
142 185
186 wthru_file
187 The same as wthru, but simultaneously reads data from the disk,
188 to gauge the performance impact of the local disk on uploads.
189
143 -m Adds metadata to the result JSON file. Multiple -m values can be 190 -m Adds metadata to the result JSON file. Multiple -m values can be
144 specified. Example: 191 specified. Example:
145 192
146 gsutil perfdiag -m "key1:value1" -m "key2:value2" \ 193 gsutil perfdiag -m "key1:val1" -m "key2:val2" gs://bucketname
147 gs://bucketname/
148 194
149 Each metadata key will be added to the top-level "metadata" 195 Each metadata key will be added to the top-level "metadata"
150 dictionary in the output JSON file. 196 dictionary in the output JSON file.
151 197
152 -o Writes the results of the diagnostic to an output file. The output 198 -o Writes the results of the diagnostic to an output file. The output
153 is a JSON file containing system information and performance 199 is a JSON file containing system information and performance
154 diagnostic results. The file can be read and reported later using 200 diagnostic results. The file can be read and reported later using
155 the -i option. 201 the -i option.
156 202
157 -i Reads the JSON output file created using the -o command and prints 203 -i Reads the JSON output file created using the -o command and prints
(...skipping 13 matching lines...) Expand all
171 217
172 218
173 <B>NOTE</B> 219 <B>NOTE</B>
174 The perfdiag command collects system information. It collects your IP address, 220 The perfdiag command collects system information. It collects your IP address,
175 executes DNS queries to Google servers and collects the results, and collects 221 executes DNS queries to Google servers and collects the results, and collects
176 network statistics information from the output of netstat -s. It will also 222 network statistics information from the output of netstat -s. It will also
177 attempt to connect to your proxy server if you have one configured. None of 223 attempt to connect to your proxy server if you have one configured. None of
178 this information will be sent to Google unless you choose to send it. 224 this information will be sent to Google unless you choose to send it.
179 """) 225 """)
180 226
227 FileDataTuple = namedtuple(
228 'FileDataTuple',
229 'size md5 data')
230
231 # Describes one object in a fanned download. If need_to_slice is specified as
232 # True, the object should be downloaded with the slice strategy. Other field
233 # names are the same as documented in PerfDiagCommand.Download.
234 FanDownloadTuple = namedtuple(
235 'FanDownloadTuple',
236 'need_to_slice object_name file_name serialization_data')
237
238 # Describes one slice in a sliced download.
239 # Field names are the same as documented in PerfDiagCommand.Download.
240 SliceDownloadTuple = namedtuple(
241 'SliceDownloadTuple',
242 'object_name file_name serialization_data start_byte end_byte')
243
244 # Describes one file in a fanned upload. If need_to_slice is specified as
245 # True, the file should be uploaded with the slice strategy. Other field
246 # names are the same as documented in PerfDiagCommand.Upload.
247 FanUploadTuple = namedtuple(
248 'FanUploadTuple',
249 'need_to_slice file_name object_name use_file')
250
251 # Describes one slice in a sliced upload.
252 # Field names are the same as documented in PerfDiagCommand.Upload.
253 SliceUploadTuple = namedtuple(
254 'SliceUploadTuple',
255 'file_name object_name use_file file_start file_size')
256
257 # Dict storing file_path:FileDataTuple for each temporary file used by
258 # perfdiag. This data should be kept outside of the PerfDiagCommand class
259 # since calls to Apply will make copies of all member data.
260 temp_file_dict = {}
261
181 262
182 class Error(Exception): 263 class Error(Exception):
183 """Base exception class for this module.""" 264 """Base exception class for this module."""
184 pass 265 pass
185 266
186 267
187 class InvalidArgument(Error): 268 class InvalidArgument(Error):
188 """Raised on invalid arguments to functions.""" 269 """Raised on invalid arguments to functions."""
189 pass 270 pass
190 271
191 272
192 def _DownloadWrapper(cls, arg, thread_state=None): 273 def _DownloadObject(cls, args, thread_state=None):
193 cls.Download(arg, thread_state=thread_state) 274 """Function argument to apply for performing fanned parallel downloads.
275
276 Args:
277 cls: The calling PerfDiagCommand class instance.
278 args: A FanDownloadTuple object describing this download.
279 thread_state: gsutil Cloud API instance to use for the operation.
280 """
281 cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
282 if args.need_to_slice:
283 cls.PerformSlicedDownload(args.object_name, args.file_name,
284 args.serialization_data)
285 else:
286 cls.Download(args.object_name, args.file_name, args.serialization_data)
194 287
195 288
196 def _UploadWrapper(cls, arg, thread_state=None): 289 def _DownloadSlice(cls, args, thread_state=None):
197 cls.Upload(arg, thread_state=thread_state) 290 """Function argument to apply for performing sliced downloads.
291
292 Args:
293 cls: The calling PerfDiagCommand class instance.
294 args: A SliceDownloadTuple object describing this download.
295 thread_state: gsutil Cloud API instance to use for the operation.
296 """
297 cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
298 cls.Download(args.object_name, args.file_name, args.serialization_data,
299 args.start_byte, args.end_byte)
198 300
199 301
200 def _DeleteWrapper(cls, arg, thread_state=None): 302 def _UploadObject(cls, args, thread_state=None):
201 cls.Delete(arg, thread_state=thread_state) 303 """Function argument to apply for performing fanned parallel uploads.
304
305 Args:
306 cls: The calling PerfDiagCommand class instance.
307 args: A FanUploadTuple object describing this upload.
308 thread_state: gsutil Cloud API instance to use for the operation.
309 """
310 cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
311 if args.need_to_slice:
312 cls.PerformSlicedUpload(args.file_name, args.object_name, args.use_file)
313 else:
314 cls.Upload(args.file_name, args.object_name, args.use_file)
315
316
317 def _UploadSlice(cls, args, thread_state=None):
318 """Function argument to apply for performing sliced parallel uploads.
319
320 Args:
321 cls: The calling PerfDiagCommand class instance.
322 args: A SliceUploadTuple object describing this upload.
323 thread_state: gsutil Cloud API instance to use for the operation.
324 """
325 cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
326 cls.Upload(args.file_name, args.object_name, args.use_file,
327 args.file_start, args.file_size)
328
329
330 def _DeleteWrapper(cls, object_name, thread_state=None):
331 """Function argument to apply for performing parallel object deletions.
332
333 Args:
334 cls: The calling PerfDiagCommand class instance.
335 object_name: The object name to delete from the test bucket.
336 thread_state: gsutil Cloud API instance to use for the operation.
337 """
338 cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
339 cls.Delete(object_name)
202 340
203 341
204 def _PerfdiagExceptionHandler(cls, e): 342 def _PerfdiagExceptionHandler(cls, e):
205 """Simple exception handler to allow post-completion status.""" 343 """Simple exception handler to allow post-completion status."""
206 cls.logger.error(str(e)) 344 cls.logger.error(str(e))
207 345
208 346
209 def _DummyTrackerCallback(_): 347 def _DummyTrackerCallback(_):
210 pass 348 pass
211 349
212 350
213 class DummyFile(object): 351 class DummyFile(object):
214 """A dummy, file-like object that throws away everything written to it.""" 352 """A dummy, file-like object that throws away everything written to it."""
215 353
216 def write(self, *args, **kwargs): # pylint: disable=invalid-name 354 def write(self, *args, **kwargs): # pylint: disable=invalid-name
217 pass 355 pass
218 356
357 def close(self): # pylint: disable=invalid-name
358 pass
359
219 360
220 # Many functions in perfdiag re-define a temporary function based on a 361 # Many functions in perfdiag re-define a temporary function based on a
221 # variable from a loop, resulting in a false positive from the linter. 362 # variable from a loop, resulting in a false positive from the linter.
222 # pylint: disable=cell-var-from-loop 363 # pylint: disable=cell-var-from-loop
223 class PerfDiagCommand(Command): 364 class PerfDiagCommand(Command):
224 """Implementation of gsutil perfdiag command.""" 365 """Implementation of gsutil perfdiag command."""
225 366
226 # Command specification. See base class for documentation. 367 # Command specification. See base class for documentation.
227 command_spec = Command.CreateCommandSpec( 368 command_spec = Command.CreateCommandSpec(
228 'perfdiag', 369 'perfdiag',
229 command_name_aliases=['diag', 'diagnostic', 'perf', 'performance'], 370 command_name_aliases=['diag', 'diagnostic', 'perf', 'performance'],
230 usage_synopsis=_SYNOPSIS, 371 usage_synopsis=_SYNOPSIS,
231 min_args=0, 372 min_args=0,
232 max_args=1, 373 max_args=1,
233 supported_sub_args='n:c:k:s:t:m:i:o:', 374 supported_sub_args='n:c:k:p:y:s:d:t:m:i:o:',
234 file_url_ok=False, 375 file_url_ok=False,
235 provider_url_ok=False, 376 provider_url_ok=False,
236 urls_start_arg=0, 377 urls_start_arg=0,
237 gs_api_support=[ApiSelector.XML, ApiSelector.JSON], 378 gs_api_support=[ApiSelector.XML, ApiSelector.JSON],
238 gs_default_api=ApiSelector.JSON, 379 gs_default_api=ApiSelector.JSON,
239 argparse_arguments=[ 380 argparse_arguments=[
240 CommandArgument.MakeNCloudBucketURLsArgument(1) 381 CommandArgument.MakeNCloudBucketURLsArgument(1)
241 ] 382 ]
242 ) 383 )
243 # Help specification. See help_provider.py for documentation. 384 # Help specification. See help_provider.py for documentation.
244 help_spec = Command.HelpSpec( 385 help_spec = Command.HelpSpec(
245 help_name='perfdiag', 386 help_name='perfdiag',
246 help_name_aliases=[], 387 help_name_aliases=[],
247 help_type='command_help', 388 help_type='command_help',
248 help_one_line_summary='Run performance diagnostic', 389 help_one_line_summary='Run performance diagnostic',
249 help_text=_DETAILED_HELP_TEXT, 390 help_text=_DETAILED_HELP_TEXT,
250 subcommand_help_text={}, 391 subcommand_help_text={},
251 ) 392 )
252 393
253 # Byte sizes to use for latency testing files. 394 # Byte sizes to use for latency testing files.
254 # TODO: Consider letting the user specify these sizes with a configuration 395 # TODO: Consider letting the user specify these sizes with a configuration
255 # parameter. 396 # parameter.
256 test_file_sizes = ( 397 test_lat_file_sizes = (
257 0, # 0 bytes 398 0, # 0 bytes
258 1024, # 1 KiB 399 1024, # 1 KiB
259 102400, # 100 KiB 400 102400, # 100 KiB
260 1048576, # 1 MiB 401 1048576, # 1 MiB
261 ) 402 )
262 403
263 # Test names. 404 # Test names.
264 RTHRU = 'rthru' 405 RTHRU = 'rthru'
406 RTHRU_FILE = 'rthru_file'
265 WTHRU = 'wthru' 407 WTHRU = 'wthru'
408 WTHRU_FILE = 'wthru_file'
266 LAT = 'lat' 409 LAT = 'lat'
267 LIST = 'list' 410 LIST = 'list'
268 411
412 # Parallelism strategies.
413 FAN = 'fan'
414 SLICE = 'slice'
415 BOTH = 'both'
416
269 # List of all diagnostic tests. 417 # List of all diagnostic tests.
270 ALL_DIAG_TESTS = (RTHRU, WTHRU, LAT, LIST) 418 ALL_DIAG_TESTS = (RTHRU, RTHRU_FILE, WTHRU, WTHRU_FILE, LAT, LIST)
419
271 # List of diagnostic tests to run by default. 420 # List of diagnostic tests to run by default.
272 DEFAULT_DIAG_TESTS = (RTHRU, WTHRU, LAT) 421 DEFAULT_DIAG_TESTS = (RTHRU, WTHRU, LAT)
273 422
423 # List of parallelism strategies.
424 PARALLEL_STRATEGIES = (FAN, SLICE, BOTH)
425
274 # Google Cloud Storage XML API endpoint host. 426 # Google Cloud Storage XML API endpoint host.
275 XML_API_HOST = boto.config.get( 427 XML_API_HOST = boto.config.get(
276 'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost) 428 'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost)
277 # Google Cloud Storage XML API endpoint port. 429 # Google Cloud Storage XML API endpoint port.
278 XML_API_PORT = boto.config.get('Credentials', 'gs_port', 80) 430 XML_API_PORT = boto.config.get('Credentials', 'gs_port', 80)
279 431
280 # Maximum number of times to retry requests on 5xx errors. 432 # Maximum number of times to retry requests on 5xx errors.
281 MAX_SERVER_ERROR_RETRIES = 5 433 MAX_SERVER_ERROR_RETRIES = 5
282 # Maximum number of times to retry requests on more serious errors like 434 # Maximum number of times to retry requests on more serious errors like
283 # the socket breaking. 435 # the socket breaking.
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
317 """ 469 """
318 self.logger.debug('Running command: %s', cmd) 470 self.logger.debug('Running command: %s', cmd)
319 stderr = subprocess.PIPE if mute_stderr else None 471 stderr = subprocess.PIPE if mute_stderr else None
320 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=stderr) 472 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=stderr)
321 (stdoutdata, _) = p.communicate() 473 (stdoutdata, _) = p.communicate()
322 if raise_on_error and p.returncode: 474 if raise_on_error and p.returncode:
323 raise CommandException("Received non-zero return code (%d) from " 475 raise CommandException("Received non-zero return code (%d) from "
324 "subprocess '%s'." % (p.returncode, ' '.join(cmd))) 476 "subprocess '%s'." % (p.returncode, ' '.join(cmd)))
325 return stdoutdata if return_output else p.returncode 477 return stdoutdata if return_output else p.returncode
326 478
479 def _WarnIfLargeData(self):
480 """Outputs a warning message if a large amount of data is being used."""
481 if self.num_objects * self.thru_filesize > HumanReadableToBytes('2GiB'):
482 self.logger.info('This is a large operation, and could take a while.')
483
484 def _MakeTempFile(self, file_size=0, mem_metadata=False,
485 mem_data=False, prefix='gsutil_test_file'):
486 """Creates a temporary file of the given size and returns its path.
487
488 Args:
489 file_size: The size of the temporary file to create.
490 mem_metadata: If true, store md5 and file size in memory at
491 temp_file_dict[fpath].md5, tempfile_data[fpath].file_size.
492 mem_data: If true, store the file data in memory at
493 temp_file_dict[fpath].data
494 prefix: The prefix to use for the temporary file. Defaults to
495 gsutil_test_file.
496
497 Returns:
498 The file path of the created temporary file.
499 """
500 fd, fpath = tempfile.mkstemp(suffix='.bin', prefix=prefix,
501 dir=self.directory, text=False)
502 with os.fdopen(fd, 'wb') as fp:
503 random_bytes = os.urandom(min(file_size,
504 self.MAX_UNIQUE_RANDOM_BYTES))
505 total_bytes_written = 0
506 while total_bytes_written < file_size:
507 num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES,
508 file_size - total_bytes_written)
509 fp.write(random_bytes[:num_bytes])
510 total_bytes_written += num_bytes
511
512 if mem_metadata or mem_data:
513 with open(fpath, 'rb') as fp:
514 file_size = GetFileSize(fp) if mem_metadata else None
515 md5 = CalculateB64EncodedMd5FromContents(fp) if mem_metadata else None
516 data = fp.read() if mem_data else None
517 temp_file_dict[fpath] = FileDataTuple(file_size, md5, data)
518
519 self.temporary_files.add(fpath)
520 return fpath
521
327 def _SetUp(self): 522 def _SetUp(self):
328 """Performs setup operations needed before diagnostics can be run.""" 523 """Performs setup operations needed before diagnostics can be run."""
329 524
330 # Stores test result data. 525 # Stores test result data.
331 self.results = {} 526 self.results = {}
332 # List of test files in a temporary location on disk for latency ops. 527 # Set of file paths for local temporary files.
333 self.latency_files = [] 528 self.temporary_files = set()
334 # List of test objects to clean up in the test bucket. 529 # Set of names for test objects that exist in the test bucket.
335 self.test_object_names = set() 530 self.temporary_objects = set()
336 # Maps each test file path to its size in bytes.
337 self.file_sizes = {}
338 # Maps each test file to its contents as a string.
339 self.file_contents = {}
340 # Maps each test file to its MD5 hash.
341 self.file_md5s = {}
342 # Total number of HTTP requests made. 531 # Total number of HTTP requests made.
343 self.total_requests = 0 532 self.total_requests = 0
344 # Total number of HTTP 5xx errors. 533 # Total number of HTTP 5xx errors.
345 self.request_errors = 0 534 self.request_errors = 0
346 # Number of responses, keyed by response code. 535 # Number of responses, keyed by response code.
347 self.error_responses_by_code = defaultdict(int) 536 self.error_responses_by_code = defaultdict(int)
348 # Total number of socket errors. 537 # Total number of socket errors.
349 self.connection_breaks = 0 538 self.connection_breaks = 0
539 # Boolean to prevent doing cleanup twice.
540 self.teardown_completed = False
350 541
351 def _MakeFile(file_size): 542 # Create files for latency test.
352 """Creates a temporary file of the given size and returns its path.""" 543 if self.LAT in self.diag_tests:
353 fd, fpath = tempfile.mkstemp(suffix='.bin', prefix='gsutil_test_file', 544 self.latency_files = []
354 text=False) 545 for file_size in self.test_lat_file_sizes:
355 self.file_sizes[fpath] = file_size 546 fpath = self._MakeTempFile(file_size, mem_metadata=True, mem_data=True)
356 random_bytes = os.urandom(min(file_size, self.MAX_UNIQUE_RANDOM_BYTES)) 547 self.latency_files.append(fpath)
357 total_bytes = 0
358 file_contents = ''
359 while total_bytes < file_size:
360 num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES, file_size - total_bytes)
361 file_contents += random_bytes[:num_bytes]
362 total_bytes += num_bytes
363 self.file_contents[fpath] = file_contents
364 with os.fdopen(fd, 'wb') as f:
365 f.write(self.file_contents[fpath])
366 with open(fpath, 'rb') as f:
367 self.file_md5s[fpath] = CalculateB64EncodedMd5FromContents(f)
368 return fpath
369 548
370 # Create files for latency tests. 549 # Create files for throughput tests.
371 for file_size in self.test_file_sizes: 550 if self.diag_tests.intersection(
372 fpath = _MakeFile(file_size) 551 (self.RTHRU, self.WTHRU, self.RTHRU_FILE, self.WTHRU_FILE)):
373 self.latency_files.append(fpath) 552 # Create a file for warming up the TCP connection.
553 self.tcp_warmup_file = self._MakeTempFile(
554 5 * 1024 * 1024, mem_metadata=True, mem_data=True)
374 555
375 # Creating a file for warming up the TCP connection. 556 # For in memory tests, throughput tests transfer the same object N times
376 self.tcp_warmup_file = _MakeFile(5 * 1024 * 1024) # 5 Mebibytes. 557 # instead of creating N objects, in order to avoid excessive memory usage.
377 # Remote file to use for TCP warmup. 558 if self.diag_tests.intersection((self.RTHRU, self.WTHRU)):
378 self.tcp_warmup_remote_file = (str(self.bucket_url) + 559 self.mem_thru_file_name = self._MakeTempFile(
379 os.path.basename(self.tcp_warmup_file)) 560 self.thru_filesize, mem_metadata=True, mem_data=True)
561 self.mem_thru_object_name = os.path.basename(self.mem_thru_file_name)
380 562
381 # Local file on disk for write throughput tests. 563 # For tests that use disk I/O, it is necessary to create N objects in
382 self.thru_local_file = _MakeFile(self.thru_filesize) 564 # in order to properly measure the performance impact of seeks.
565 if self.diag_tests.intersection((self.RTHRU_FILE, self.WTHRU_FILE)):
566 # List of file names and corresponding object names to use for file
567 # throughput tests.
568 self.thru_file_names = []
569 self.thru_object_names = []
570
571 free_disk_space = CheckFreeSpace(self.directory)
572 if free_disk_space >= self.thru_filesize * self.num_objects:
573 self.logger.info('\nCreating %d local files each of size %s.'
574 % (self.num_objects,
575 MakeHumanReadable(self.thru_filesize)))
576 self._WarnIfLargeData()
577 for _ in range(self.num_objects):
578 file_name = self._MakeTempFile(self.thru_filesize,
579 mem_metadata=True)
580 self.thru_file_names.append(file_name)
581 self.thru_object_names.append(os.path.basename(file_name))
582 else:
583 raise CommandException(
584 'Not enough free disk space for throughput files: '
585 '%s of disk space required, but only %s available.'
586 % (MakeHumanReadable(self.thru_filesize * self.num_objects),
587 MakeHumanReadable(free_disk_space)))
383 588
384 # Dummy file buffer to use for downloading that goes nowhere. 589 # Dummy file buffer to use for downloading that goes nowhere.
385 self.discard_sink = DummyFile() 590 self.discard_sink = DummyFile()
386 591
592 # Filter out misleading progress callback output and the incorrect
593 # suggestion to use gsutil -m perfdiag.
594 self.logger.addFilter(self._PerfdiagFilter())
595
387 def _TearDown(self): 596 def _TearDown(self):
388 """Performs operations to clean things up after performing diagnostics.""" 597 """Performs operations to clean things up after performing diagnostics."""
389 for fpath in self.latency_files + [self.thru_local_file, 598 if not self.teardown_completed:
390 self.tcp_warmup_file]: 599 temp_file_dict.clear()
600
391 try: 601 try:
392 os.remove(fpath) 602 for fpath in self.temporary_files:
603 os.remove(fpath)
604 if self.delete_directory:
605 os.rmdir(self.directory)
393 except OSError: 606 except OSError:
394 pass 607 pass
395 608
396 for object_name in self.test_object_names: 609 if self.threads > 1 or self.processes > 1:
397 610 args = [obj for obj in self.temporary_objects]
398 def _Delete(): 611 self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
399 try: 612 arg_checker=DummyArgChecker,
400 self.gsutil_api.DeleteObject(self.bucket_url.bucket_name, 613 parallel_operations_override=True,
401 object_name, 614 process_count=self.processes, thread_count=self.threads)
402 provider=self.provider) 615 else:
403 except NotFoundException: 616 for object_name in self.temporary_objects:
404 pass 617 self.Delete(object_name)
405 618 self.teardown_completed = True
406 self._RunOperation(_Delete)
407 619
408 @contextlib.contextmanager 620 @contextlib.contextmanager
409 def _Time(self, key, bucket): 621 def _Time(self, key, bucket):
410 """A context manager that measures time. 622 """A context manager that measures time.
411 623
412 A context manager that prints a status message before and after executing 624 A context manager that prints a status message before and after executing
413 the inner command and times how long the inner command takes. Keeps track of 625 the inner command and times how long the inner command takes. Keeps track of
414 the timing, aggregated by the given key. 626 the timing, aggregated by the given key.
415 627
416 Args: 628 Args:
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
471 break 683 break
472 else: 684 else:
473 self.connection_breaks += 1 685 self.connection_breaks += 1
474 return return_val 686 return return_val
475 687
476 def _RunLatencyTests(self): 688 def _RunLatencyTests(self):
477 """Runs latency tests.""" 689 """Runs latency tests."""
478 # Stores timing information for each category of operation. 690 # Stores timing information for each category of operation.
479 self.results['latency'] = defaultdict(list) 691 self.results['latency'] = defaultdict(list)
480 692
481 for i in range(self.num_iterations): 693 for i in range(self.num_objects):
482 self.logger.info('\nRunning latency iteration %d...', i+1) 694 self.logger.info('\nRunning latency iteration %d...', i+1)
483 for fpath in self.latency_files: 695 for fpath in self.latency_files:
696 file_data = temp_file_dict[fpath]
484 url = self.bucket_url.Clone() 697 url = self.bucket_url.Clone()
485 url.object_name = os.path.basename(fpath) 698 url.object_name = os.path.basename(fpath)
486 file_size = self.file_sizes[fpath] 699 file_size = file_data.size
487 readable_file_size = MakeHumanReadable(file_size) 700 readable_file_size = MakeHumanReadable(file_size)
488 701
489 self.logger.info( 702 self.logger.info(
490 "\nFile of size %s located on disk at '%s' being diagnosed in the " 703 "\nFile of size %s located on disk at '%s' being diagnosed in the "
491 "cloud at '%s'.", readable_file_size, fpath, url) 704 "cloud at '%s'.", readable_file_size, fpath, url)
492 705
493 upload_target = StorageUrlToUploadObjectMetadata(url) 706 upload_target = StorageUrlToUploadObjectMetadata(url)
494 707
495 def _Upload(): 708 def _Upload():
496 io_fp = cStringIO.StringIO(self.file_contents[fpath]) 709 io_fp = cStringIO.StringIO(file_data.data)
497 with self._Time('UPLOAD_%d' % file_size, self.results['latency']): 710 with self._Time('UPLOAD_%d' % file_size, self.results['latency']):
498 self.gsutil_api.UploadObject( 711 self.gsutil_api.UploadObject(
499 io_fp, upload_target, size=file_size, provider=self.provider, 712 io_fp, upload_target, size=file_size, provider=self.provider,
500 fields=['name']) 713 fields=['name'])
501 self._RunOperation(_Upload) 714 self._RunOperation(_Upload)
502 715
503 def _Metadata(): 716 def _Metadata():
504 with self._Time('METADATA_%d' % file_size, self.results['latency']): 717 with self._Time('METADATA_%d' % file_size, self.results['latency']):
505 return self.gsutil_api.GetObjectMetadata( 718 return self.gsutil_api.GetObjectMetadata(
506 url.bucket_name, url.object_name, 719 url.bucket_name, url.object_name,
507 provider=self.provider, fields=['name', 'contentType', 720 provider=self.provider, fields=['name', 'contentType',
508 'mediaLink', 'size']) 721 'mediaLink', 'size'])
509 # Download will get the metadata first if we don't pass it in. 722 # Download will get the metadata first if we don't pass it in.
510 download_metadata = self._RunOperation(_Metadata) 723 download_metadata = self._RunOperation(_Metadata)
511 serialization_dict = GetDownloadSerializationDict(download_metadata) 724 serialization_data = GetDownloadSerializationData(download_metadata)
512 serialization_data = json.dumps(serialization_dict)
513 725
514 def _Download(): 726 def _Download():
515 with self._Time('DOWNLOAD_%d' % file_size, self.results['latency']): 727 with self._Time('DOWNLOAD_%d' % file_size, self.results['latency']):
516 self.gsutil_api.GetObjectMedia( 728 self.gsutil_api.GetObjectMedia(
517 url.bucket_name, url.object_name, self.discard_sink, 729 url.bucket_name, url.object_name, self.discard_sink,
518 provider=self.provider, serialization_data=serialization_data) 730 provider=self.provider, serialization_data=serialization_data)
519 self._RunOperation(_Download) 731 self._RunOperation(_Download)
520 732
521 def _Delete(): 733 def _Delete():
522 with self._Time('DELETE_%d' % file_size, self.results['latency']): 734 with self._Time('DELETE_%d' % file_size, self.results['latency']):
523 self.gsutil_api.DeleteObject(url.bucket_name, url.object_name, 735 self.gsutil_api.DeleteObject(url.bucket_name, url.object_name,
524 provider=self.provider) 736 provider=self.provider)
525 self._RunOperation(_Delete) 737 self._RunOperation(_Delete)
526 738
527 class _CpFilter(logging.Filter): 739 class _PerfdiagFilter(logging.Filter):
528 740
529 def filter(self, record): 741 def filter(self, record):
530 # Used to prevent cp._LogCopyOperation from spewing output from 742 # Used to prevent unnecessary output when using multiprocessing.
531 # subprocesses about every iteration.
532 msg = record.getMessage() 743 msg = record.getMessage()
533 return not (('Copying file:///' in msg) or ('Copying gs://' in msg) or 744 return not (('Copying file:///' in msg) or ('Copying gs://' in msg) or
534 ('Computing CRC' in msg)) 745 ('Computing CRC' in msg) or ('gsutil -m perfdiag' in msg))
535 746
536 def _PerfdiagExceptionHandler(self, e): 747 def _PerfdiagExceptionHandler(self, e):
537 """Simple exception handler to allow post-completion status.""" 748 """Simple exception handler to allow post-completion status."""
538 self.logger.error(str(e)) 749 self.logger.error(str(e))
539 750
540 def _RunReadThruTests(self): 751 def PerformFannedDownload(self, need_to_slice, object_names, file_names,
752 serialization_data):
753 """Performs a parallel download of multiple objects using the fan strategy.
754
755 Args:
756 need_to_slice: If True, additionally apply the slice strategy to each
757 object in object_names.
758 object_names: A list of object names to be downloaded. Each object must
759 already exist in the test bucket.
760 file_names: A list, corresponding by index to object_names, of file names
761 for downloaded data. If None, discard downloaded data.
762 serialization_data: A list, corresponding by index to object_names,
763 of serialization data for each object.
764 """
765 args = []
766 for i in range(len(object_names)):
767 file_name = file_names[i] if file_names else None
768 args.append(FanDownloadTuple(
769 need_to_slice, object_names[i], file_name,
770 serialization_data[i]))
771 self.Apply(_DownloadObject, args, _PerfdiagExceptionHandler,
772 ('total_requests', 'request_errors'),
773 arg_checker=DummyArgChecker, parallel_operations_override=True,
774 process_count=self.processes, thread_count=self.threads)
775
776 def PerformSlicedDownload(self, object_name, file_name, serialization_data):
777 """Performs a download of an object using the slice strategy.
778
779 Args:
780 object_name: The name of the object to download.
781 file_name: The name of the file to download data to, or None if data
782 should be discarded.
783 serialization_data: The serialization data for the object.
784 """
785 if file_name:
786 with open(file_name, 'ab') as fp:
787 fp.truncate(self.thru_filesize)
788 component_size = DivideAndCeil(self.thru_filesize, self.num_slices)
789 args = []
790 for i in range(self.num_slices):
791 start_byte = i * component_size
792 end_byte = min((i + 1) * (component_size) - 1, self.thru_filesize - 1)
793 args.append(SliceDownloadTuple(object_name, file_name, serialization_data,
794 start_byte, end_byte))
795 self.Apply(_DownloadSlice, args, _PerfdiagExceptionHandler,
796 ('total_requests', 'request_errors'),
797 arg_checker=DummyArgChecker, parallel_operations_override=True,
798 process_count=self.processes, thread_count=self.threads)
799
800 def PerformFannedUpload(self, need_to_slice, file_names, object_names,
801 use_file):
802 """Performs a parallel upload of multiple files using the fan strategy.
803
804 The metadata for file_name should be present in temp_file_dict prior
805 to calling. Also, the data for file_name should be present in temp_file_dict
806 if use_file is specified as False.
807
808 Args:
809 need_to_slice: If True, additionally apply the slice strategy to each
810 file in file_names.
811 file_names: A list of file names to be uploaded.
812 object_names: A list, corresponding by by index to file_names, of object
813 names to upload data to.
814 use_file: If true, use disk I/O, otherwise read upload data from memory.
815 """
816 args = []
817 for i in range(len(file_names)):
818 args.append(FanUploadTuple(
819 need_to_slice, file_names[i], object_names[i], use_file))
820 self.Apply(_UploadObject, args, _PerfdiagExceptionHandler,
821 ('total_requests', 'request_errors'),
822 arg_checker=DummyArgChecker, parallel_operations_override=True,
823 process_count=self.processes, thread_count=self.threads)
824
825 def PerformSlicedUpload(self, file_name, object_name, use_file):
826 """Performs a parallel upload of a file using the slice strategy.
827
828 The metadata for file_name should be present in temp_file_dict prior
829 to calling. Also, the data from for file_name should be present in
830 temp_file_dict if use_file is specified as False.
831
832 Args:
833 file_name: The name of the file to upload.
834 object_name: The name of the object to upload to.
835 use_file: If true, use disk I/O, otherwise read upload data from memory.
836 """
837 # Divide the file into components.
838 component_size = DivideAndCeil(self.thru_filesize, self.num_slices)
839 component_object_names = (
840 [object_name + str(i) for i in range(self.num_slices)])
841
842 args = []
843 for i in range(self.num_slices):
844 component_start = i * component_size
845 component_size = min(component_size,
846 temp_file_dict[file_name].size - component_start)
847 args.append(SliceUploadTuple(file_name, component_object_names[i],
848 use_file, component_start, component_size))
849
850 # Upload the components in parallel.
851 try:
852 self.Apply(_UploadSlice, args, _PerfdiagExceptionHandler,
853 ('total_requests', 'request_errors'),
854 arg_checker=DummyArgChecker, parallel_operations_override=True,
855 process_count=self.processes, thread_count=self.threads)
856
857 # Compose the components into an object.
858 request_components = []
859 for i in range(self.num_slices):
860 src_obj_metadata = (
861 apitools_messages.ComposeRequest.SourceObjectsValueListEntry(
862 name=component_object_names[i]))
863 request_components.append(src_obj_metadata)
864
865 dst_obj_metadata = apitools_messages.Object()
866 dst_obj_metadata.name = object_name
867 dst_obj_metadata.bucket = self.bucket_url.bucket_name
868 def _Compose():
869 self.gsutil_api.ComposeObject(request_components, dst_obj_metadata,
870 provider=self.provider)
871 self._RunOperation(_Compose)
872 finally:
873 # Delete the temporary components.
874 self.Apply(_DeleteWrapper, component_object_names,
875 _PerfdiagExceptionHandler,
876 ('total_requests', 'request_errors'),
877 arg_checker=DummyArgChecker, parallel_operations_override=True,
878 process_count=self.processes, thread_count=self.threads)
879
880 def _RunReadThruTests(self, use_file=False):
541 """Runs read throughput tests.""" 881 """Runs read throughput tests."""
882 test_name = 'read_throughput_file' if use_file else 'read_throughput'
883 file_io_string = 'with file I/O' if use_file else ''
542 self.logger.info( 884 self.logger.info(
543 '\nRunning read throughput tests (%s iterations of size %s)' % 885 '\nRunning read throughput tests %s (%s objects of size %s)' %
544 (self.num_iterations, MakeHumanReadable(self.thru_filesize))) 886 (file_io_string, self.num_objects,
545 887 MakeHumanReadable(self.thru_filesize)))
546 self.results['read_throughput'] = {'file_size': self.thru_filesize, 888 self._WarnIfLargeData()
547 'num_times': self.num_iterations, 889
548 'processes': self.processes, 890 self.results[test_name] = {'file_size': self.thru_filesize,
549 'threads': self.threads} 891 'processes': self.processes,
550 892 'threads': self.threads,
551 # Copy the TCP warmup file. 893 'parallelism': self.parallel_strategy
552 warmup_url = self.bucket_url.Clone() 894 }
553 warmup_url.object_name = os.path.basename(self.tcp_warmup_file) 895
554 warmup_target = StorageUrlToUploadObjectMetadata(warmup_url) 896 # Copy the file(s) to the test bucket, and also get the serialization data
555 self.test_object_names.add(warmup_url.object_name) 897 # so that we can pass it to download.
556 898 if use_file:
557 def _Upload1(): 899 # For test with file I/O use N files on disk to preserve seek performance.
558 self.gsutil_api.UploadObject( 900 file_names = self.thru_file_names
559 cStringIO.StringIO(self.file_contents[self.tcp_warmup_file]), 901 object_names = self.thru_object_names
560 warmup_target, provider=self.provider, fields=['name']) 902 serialization_data = []
561 self._RunOperation(_Upload1) 903 for i in range(self.num_objects):
562 904 self.temporary_objects.add(self.thru_object_names[i])
563 # Copy the file to remote location before reading. 905 if self.WTHRU_FILE in self.diag_tests:
564 thru_url = self.bucket_url.Clone() 906 # If we ran the WTHRU_FILE test, then the objects already exist.
565 thru_url.object_name = os.path.basename(self.thru_local_file) 907 obj_metadata = self.gsutil_api.GetObjectMetadata(
566 thru_target = StorageUrlToUploadObjectMetadata(thru_url) 908 self.bucket_url.bucket_name, self.thru_object_names[i],
567 thru_target.md5Hash = self.file_md5s[self.thru_local_file] 909 fields=['size', 'mediaLink'], provider=self.bucket_url.scheme)
568 self.test_object_names.add(thru_url.object_name) 910 else:
569 911 obj_metadata = self.Upload(self.thru_file_names[i],
570 # Get the mediaLink here so that we can pass it to download. 912 self.thru_object_names[i], use_file)
571 def _Upload2(): 913
572 return self.gsutil_api.UploadObject( 914 # File overwrite causes performance issues with sliced downloads.
573 cStringIO.StringIO(self.file_contents[self.thru_local_file]), 915 # Delete the file and reopen it for download. This matches what a real
574 thru_target, provider=self.provider, size=self.thru_filesize, 916 # download would look like.
575 fields=['name', 'mediaLink', 'size']) 917 os.unlink(self.thru_file_names[i])
576 918 open(self.thru_file_names[i], 'ab').close()
577 # Get the metadata for the object so that we are just measuring performance 919 serialization_data.append(GetDownloadSerializationData(obj_metadata))
578 # on the actual bytes transfer. 920 else:
579 download_metadata = self._RunOperation(_Upload2) 921 # For in-memory test only use one file but copy it num_objects times, to
580 serialization_dict = GetDownloadSerializationDict(download_metadata) 922 # allow scalability in num_objects.
581 serialization_data = json.dumps(serialization_dict) 923 self.temporary_objects.add(self.mem_thru_object_name)
582 924 obj_metadata = self.Upload(self.mem_thru_file_name,
925 self.mem_thru_object_name, use_file)
926 file_names = None
927 object_names = [self.mem_thru_object_name] * self.num_objects
928 serialization_data = (
929 [GetDownloadSerializationData(obj_metadata)] * self.num_objects)
930
931 # Warmup the TCP connection.
932 warmup_obj_name = os.path.basename(self.tcp_warmup_file)
933 self.temporary_objects.add(warmup_obj_name)
934 self.Upload(self.tcp_warmup_file, warmup_obj_name)
935 self.Download(warmup_obj_name)
936
937 t0 = time.time()
583 if self.processes == 1 and self.threads == 1: 938 if self.processes == 1 and self.threads == 1:
584 939 for i in range(self.num_objects):
585 # Warm up the TCP connection. 940 file_name = file_names[i] if use_file else None
586 def _Warmup(): 941 self.Download(object_names[i], file_name, serialization_data[i])
587 self.gsutil_api.GetObjectMedia(warmup_url.bucket_name,
588 warmup_url.object_name,
589 self.discard_sink,
590 provider=self.provider)
591 self._RunOperation(_Warmup)
592
593 times = []
594
595 def _Download():
596 t0 = time.time()
597 self.gsutil_api.GetObjectMedia(
598 thru_url.bucket_name, thru_url.object_name, self.discard_sink,
599 provider=self.provider, serialization_data=serialization_data)
600 t1 = time.time()
601 times.append(t1 - t0)
602 for _ in range(self.num_iterations):
603 self._RunOperation(_Download)
604 time_took = sum(times)
605 else: 942 else:
606 args = ([(thru_url.bucket_name, thru_url.object_name, serialization_data)] 943 if self.parallel_strategy in (self.FAN, self.BOTH):
607 * self.num_iterations) 944 need_to_slice = (self.parallel_strategy == self.BOTH)
608 self.logger.addFilter(self._CpFilter()) 945 self.PerformFannedDownload(need_to_slice, object_names, file_names,
609 946 serialization_data)
610 t0 = time.time() 947 elif self.parallel_strategy == self.SLICE:
611 self.Apply(_DownloadWrapper, 948 for i in range(self.num_objects):
612 args, 949 file_name = file_names[i] if use_file else None
613 _PerfdiagExceptionHandler, 950 self.PerformSlicedDownload(
614 arg_checker=DummyArgChecker, 951 object_names[i], file_name, serialization_data[i])
615 parallel_operations_override=True, 952 t1 = time.time()
616 process_count=self.processes, 953
617 thread_count=self.threads) 954 time_took = t1 - t0
618 t1 = time.time() 955 total_bytes_copied = self.thru_filesize * self.num_objects
619 time_took = t1 - t0
620
621 total_bytes_copied = self.thru_filesize * self.num_iterations
622 bytes_per_second = total_bytes_copied / time_took 956 bytes_per_second = total_bytes_copied / time_took
623 957
624 self.results['read_throughput']['time_took'] = time_took 958 self.results[test_name]['time_took'] = time_took
625 self.results['read_throughput']['total_bytes_copied'] = total_bytes_copied 959 self.results[test_name]['total_bytes_copied'] = total_bytes_copied
626 self.results['read_throughput']['bytes_per_second'] = bytes_per_second 960 self.results[test_name]['bytes_per_second'] = bytes_per_second
627 961
628 def _RunWriteThruTests(self): 962 def _RunWriteThruTests(self, use_file=False):
629 """Runs write throughput tests.""" 963 """Runs write throughput tests."""
964 test_name = 'write_throughput_file' if use_file else 'write_throughput'
965 file_io_string = 'with file I/O' if use_file else ''
630 self.logger.info( 966 self.logger.info(
631 '\nRunning write throughput tests (%s iterations of size %s)' % 967 '\nRunning write throughput tests %s (%s objects of size %s)' %
632 (self.num_iterations, MakeHumanReadable(self.thru_filesize))) 968 (file_io_string, self.num_objects,
633 969 MakeHumanReadable(self.thru_filesize)))
634 self.results['write_throughput'] = {'file_size': self.thru_filesize, 970 self._WarnIfLargeData()
635 'num_copies': self.num_iterations, 971
636 'processes': self.processes, 972 self.results[test_name] = {'file_size': self.thru_filesize,
637 'threads': self.threads} 973 'processes': self.processes,
638 974 'threads': self.threads,
639 warmup_url = self.bucket_url.Clone() 975 'parallelism': self.parallel_strategy}
640 warmup_url.object_name = os.path.basename(self.tcp_warmup_file) 976
641 warmup_target = StorageUrlToUploadObjectMetadata(warmup_url) 977 # Warmup the TCP connection.
642 self.test_object_names.add(warmup_url.object_name) 978 warmup_obj_name = os.path.basename(self.tcp_warmup_file)
643 979 self.temporary_objects.add(warmup_obj_name)
644 thru_url = self.bucket_url.Clone() 980 self.Upload(self.tcp_warmup_file, warmup_obj_name)
645 thru_url.object_name = os.path.basename(self.thru_local_file) 981
646 thru_target = StorageUrlToUploadObjectMetadata(thru_url) 982 if use_file:
647 thru_tuples = [] 983 # For test with file I/O use N files on disk to preserve seek performance.
648 for i in xrange(self.num_iterations): 984 file_names = self.thru_file_names
649 # Create a unique name for each uploaded object. Otherwise, 985 object_names = self.thru_object_names
650 # the XML API would fail when trying to non-atomically get metadata 986 else:
651 # for the object that gets blown away by the overwrite. 987 # For in-memory test only use one file but copy it num_objects times, to
652 remote_object_name = thru_target.name + str(i) 988 # allow for scalability in num_objects.
653 self.test_object_names.add(remote_object_name) 989 file_names = [self.mem_thru_file_name] * self.num_objects
654 thru_tuples.append(UploadObjectTuple(thru_target.bucket, 990 object_names = (
655 remote_object_name, 991 [self.mem_thru_object_name + str(i) for i in range(self.num_objects)])
656 filepath=self.thru_local_file)) 992
657 993 for object_name in object_names:
994 self.temporary_objects.add(object_name)
995
996 t0 = time.time()
658 if self.processes == 1 and self.threads == 1: 997 if self.processes == 1 and self.threads == 1:
659 # Warm up the TCP connection. 998 for i in range(self.num_objects):
660 def _Warmup(): 999 self.Upload(file_names[i], object_names[i], use_file)
661 self.gsutil_api.UploadObject(
662 cStringIO.StringIO(self.file_contents[self.tcp_warmup_file]),
663 warmup_target, provider=self.provider, size=self.thru_filesize,
664 fields=['name'])
665 self._RunOperation(_Warmup)
666
667 times = []
668
669 for i in xrange(self.num_iterations):
670 thru_tuple = thru_tuples[i]
671 def _Upload():
672 """Uploads the write throughput measurement object."""
673 upload_target = apitools_messages.Object(
674 bucket=thru_tuple.bucket_name, name=thru_tuple.object_name,
675 md5Hash=thru_tuple.md5)
676 io_fp = cStringIO.StringIO(self.file_contents[self.thru_local_file])
677 t0 = time.time()
678 if self.thru_filesize < ResumableThreshold():
679 self.gsutil_api.UploadObject(
680 io_fp, upload_target, provider=self.provider,
681 size=self.thru_filesize, fields=['name'])
682 else:
683 self.gsutil_api.UploadObjectResumable(
684 io_fp, upload_target, provider=self.provider,
685 size=self.thru_filesize, fields=['name'],
686 tracker_callback=_DummyTrackerCallback)
687
688 t1 = time.time()
689 times.append(t1 - t0)
690
691 self._RunOperation(_Upload)
692 time_took = sum(times)
693
694 else: 1000 else:
695 args = thru_tuples 1001 if self.parallel_strategy in (self.FAN, self.BOTH):
696 t0 = time.time() 1002 need_to_slice = (self.parallel_strategy == self.BOTH)
697 self.Apply(_UploadWrapper, 1003 self.PerformFannedUpload(need_to_slice, file_names, object_names,
698 args, 1004 use_file)
699 _PerfdiagExceptionHandler, 1005 elif self.parallel_strategy == self.SLICE:
700 arg_checker=DummyArgChecker, 1006 for i in range(self.num_objects):
701 parallel_operations_override=True, 1007 self.PerformSlicedUpload(file_names[i], object_names[i], use_file)
702 process_count=self.processes, 1008 t1 = time.time()
703 thread_count=self.threads) 1009
704 t1 = time.time() 1010 time_took = t1 - t0
705 time_took = t1 - t0 1011 total_bytes_copied = self.thru_filesize * self.num_objects
706
707 total_bytes_copied = self.thru_filesize * self.num_iterations
708 bytes_per_second = total_bytes_copied / time_took 1012 bytes_per_second = total_bytes_copied / time_took
709 1013
710 self.results['write_throughput']['time_took'] = time_took 1014 self.results[test_name]['time_took'] = time_took
711 self.results['write_throughput']['total_bytes_copied'] = total_bytes_copied 1015 self.results[test_name]['total_bytes_copied'] = total_bytes_copied
712 self.results['write_throughput']['bytes_per_second'] = bytes_per_second 1016 self.results[test_name]['bytes_per_second'] = bytes_per_second
713 1017
714 def _RunListTests(self): 1018 def _RunListTests(self):
715 """Runs eventual consistency listing latency tests.""" 1019 """Runs eventual consistency listing latency tests."""
716 self.results['listing'] = {'num_files': self.num_iterations} 1020 self.results['listing'] = {'num_files': self.num_objects}
717 1021
718 # Generate N random object names to put in the bucket. 1022 # Generate N random objects to put into the bucket.
719 list_prefix = 'gsutil-perfdiag-list-' 1023 list_prefix = 'gsutil-perfdiag-list-'
1024 list_fpaths = []
720 list_objects = [] 1025 list_objects = []
721 for _ in xrange(self.num_iterations): 1026 args = []
722 list_object_name = u'%s%s' % (list_prefix, os.urandom(20).encode('hex')) 1027 for _ in xrange(self.num_objects):
723 self.test_object_names.add(list_object_name) 1028 fpath = self._MakeTempFile(0, mem_data=True, mem_metadata=True,
724 list_objects.append(list_object_name) 1029 prefix=list_prefix)
1030 list_fpaths.append(fpath)
1031 object_name = os.path.basename(fpath)
1032 list_objects.append(object_name)
1033 args.append(FanUploadTuple(False, fpath, object_name, False))
1034 self.temporary_objects.add(object_name)
725 1035
726 # Add the objects to the bucket. 1036 # Add the objects to the bucket.
727 self.logger.info( 1037 self.logger.info(
728 '\nWriting %s objects for listing test...', self.num_iterations) 1038 '\nWriting %s objects for listing test...', self.num_objects)
729 empty_md5 = CalculateB64EncodedMd5FromContents(cStringIO.StringIO('')) 1039
730 args = [ 1040 self.Apply(_UploadObject, args, _PerfdiagExceptionHandler,
731 UploadObjectTuple(self.bucket_url.bucket_name, name, md5=empty_md5,
732 contents='') for name in list_objects]
733 self.Apply(_UploadWrapper, args, _PerfdiagExceptionHandler,
734 arg_checker=DummyArgChecker) 1041 arg_checker=DummyArgChecker)
735 1042
736 list_latencies = [] 1043 list_latencies = []
737 files_seen = [] 1044 files_seen = []
738 total_start_time = time.time() 1045 total_start_time = time.time()
739 expected_objects = set(list_objects) 1046 expected_objects = set(list_objects)
740 found_objects = set() 1047 found_objects = set()
741 1048
742 def _List(): 1049 def _List():
743 """Lists and returns objects in the bucket. Also records latency.""" 1050 """Lists and returns objects in the bucket. Also records latency."""
744 t0 = time.time() 1051 t0 = time.time()
745 objects = list(self.gsutil_api.ListObjects( 1052 objects = list(self.gsutil_api.ListObjects(
746 self.bucket_url.bucket_name, prefix=list_prefix, delimiter='/', 1053 self.bucket_url.bucket_name, delimiter='/',
747 provider=self.provider, fields=['items/name'])) 1054 provider=self.provider, fields=['items/name']))
748 t1 = time.time() 1055 t1 = time.time()
749 list_latencies.append(t1 - t0) 1056 list_latencies.append(t1 - t0)
750 return set([obj.data.name for obj in objects]) 1057 return set([obj.data.name for obj in objects])
751 1058
752 self.logger.info( 1059 self.logger.info(
753 'Listing bucket %s waiting for %s objects to appear...', 1060 'Listing bucket %s waiting for %s objects to appear...',
754 self.bucket_url.bucket_name, self.num_iterations) 1061 self.bucket_url.bucket_name, self.num_objects)
755 while expected_objects - found_objects: 1062 while expected_objects - found_objects:
756 def _ListAfterUpload(): 1063 def _ListAfterUpload():
757 names = _List() 1064 names = _List()
758 found_objects.update(names & expected_objects) 1065 found_objects.update(names & expected_objects)
759 files_seen.append(len(found_objects)) 1066 files_seen.append(len(found_objects))
760 self._RunOperation(_ListAfterUpload) 1067 self._RunOperation(_ListAfterUpload)
761 if expected_objects - found_objects: 1068 if expected_objects - found_objects:
762 if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME: 1069 if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME:
763 self.logger.warning('Maximum time reached waiting for listing.') 1070 self.logger.warning('Maximum time reached waiting for listing.')
764 break 1071 break
765 total_end_time = time.time() 1072 total_end_time = time.time()
766 1073
767 self.results['listing']['insert'] = { 1074 self.results['listing']['insert'] = {
768 'num_listing_calls': len(list_latencies), 1075 'num_listing_calls': len(list_latencies),
769 'list_latencies': list_latencies, 1076 'list_latencies': list_latencies,
770 'files_seen_after_listing': files_seen, 1077 'files_seen_after_listing': files_seen,
771 'time_took': total_end_time - total_start_time, 1078 'time_took': total_end_time - total_start_time,
772 } 1079 }
773 1080
1081 args = [object_name for object_name in list_objects]
774 self.logger.info( 1082 self.logger.info(
775 'Deleting %s objects for listing test...', self.num_iterations) 1083 'Deleting %s objects for listing test...', self.num_objects)
776 self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler, 1084 self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
777 arg_checker=DummyArgChecker) 1085 arg_checker=DummyArgChecker)
778 1086
779 self.logger.info( 1087 self.logger.info(
780 'Listing bucket %s waiting for %s objects to disappear...', 1088 'Listing bucket %s waiting for %s objects to disappear...',
781 self.bucket_url.bucket_name, self.num_iterations) 1089 self.bucket_url.bucket_name, self.num_objects)
782 list_latencies = [] 1090 list_latencies = []
783 files_seen = [] 1091 files_seen = []
784 total_start_time = time.time() 1092 total_start_time = time.time()
785 found_objects = set(list_objects) 1093 found_objects = set(list_objects)
786 while found_objects: 1094 while found_objects:
787 def _ListAfterDelete(): 1095 def _ListAfterDelete():
788 names = _List() 1096 names = _List()
789 found_objects.intersection_update(names) 1097 found_objects.intersection_update(names)
790 files_seen.append(len(found_objects)) 1098 files_seen.append(len(found_objects))
791 self._RunOperation(_ListAfterDelete) 1099 self._RunOperation(_ListAfterDelete)
792 if found_objects: 1100 if found_objects:
793 if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME: 1101 if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME:
794 self.logger.warning('Maximum time reached waiting for listing.') 1102 self.logger.warning('Maximum time reached waiting for listing.')
795 break 1103 break
796 total_end_time = time.time() 1104 total_end_time = time.time()
797 1105
798 self.results['listing']['delete'] = { 1106 self.results['listing']['delete'] = {
799 'num_listing_calls': len(list_latencies), 1107 'num_listing_calls': len(list_latencies),
800 'list_latencies': list_latencies, 1108 'list_latencies': list_latencies,
801 'files_seen_after_listing': files_seen, 1109 'files_seen_after_listing': files_seen,
802 'time_took': total_end_time - total_start_time, 1110 'time_took': total_end_time - total_start_time,
803 } 1111 }
804 1112
805 def Upload(self, thru_tuple, thread_state=None): 1113 def Upload(self, file_name, object_name, use_file=False, file_start=0,
806 gsutil_api = GetCloudApiInstance(self, thread_state) 1114 file_size=None):
1115 """Performs an upload to the test bucket.
807 1116
808 md5hash = thru_tuple.md5 1117 The file is uploaded to the bucket referred to by self.bucket_url, and has
809 contents = thru_tuple.contents 1118 name object_name.
810 if thru_tuple.filepath:
811 md5hash = self.file_md5s[thru_tuple.filepath]
812 contents = self.file_contents[thru_tuple.filepath]
813
814 upload_target = apitools_messages.Object(
815 bucket=thru_tuple.bucket_name, name=thru_tuple.object_name,
816 md5Hash=md5hash)
817 file_size = len(contents)
818 if file_size < ResumableThreshold():
819 gsutil_api.UploadObject(
820 cStringIO.StringIO(contents), upload_target,
821 provider=self.provider, size=file_size, fields=['name'])
822 else:
823 gsutil_api.UploadObjectResumable(
824 cStringIO.StringIO(contents), upload_target,
825 provider=self.provider, size=file_size, fields=['name'],
826 tracker_callback=_DummyTrackerCallback)
827
828 def Download(self, download_tuple, thread_state=None):
829 """Downloads a file.
830 1119
831 Args: 1120 Args:
832 download_tuple: (bucket name, object name, serialization data for object). 1121 file_name: The path to the local file, and the key to its entry in
833 thread_state: gsutil Cloud API instance to use for the download. 1122 temp_file_dict.
1123 object_name: The name of the remote object.
1124 use_file: If true, use disk I/O, otherwise read everything from memory.
1125 file_start: The first byte in the file to upload to the object.
1126 (only should be specified for sliced uploads)
1127 file_size: The size of the file to upload.
1128 (only should be specified for sliced uploads)
1129
1130 Returns:
1131 Uploaded Object Metadata.
834 """ 1132 """
835 gsutil_api = GetCloudApiInstance(self, thread_state) 1133 fp = None
836 gsutil_api.GetObjectMedia( 1134 if file_size is None:
837 download_tuple[0], download_tuple[1], self.discard_sink, 1135 file_size = temp_file_dict[file_name].size
838 provider=self.provider, serialization_data=download_tuple[2])
839 1136
840 def Delete(self, thru_tuple, thread_state=None): 1137 upload_url = self.bucket_url.Clone()
841 gsutil_api = thread_state or self.gsutil_api 1138 upload_url.object_name = object_name
842 gsutil_api.DeleteObject( 1139 upload_target = StorageUrlToUploadObjectMetadata(upload_url)
843 thru_tuple.bucket_name, thru_tuple.object_name, provider=self.provider) 1140
1141 try:
1142 if use_file:
1143 fp = FilePart(file_name, file_start, file_size)
1144 else:
1145 data = temp_file_dict[file_name].data[file_start:file_start+file_size]
1146 fp = cStringIO.StringIO(data)
1147
1148 def _InnerUpload():
1149 if file_size < ResumableThreshold():
1150 return self.gsutil_api.UploadObject(
1151 fp, upload_target, provider=self.provider, size=file_size,
1152 fields=['name', 'mediaLink', 'size'])
1153 else:
1154 return self.gsutil_api.UploadObjectResumable(
1155 fp, upload_target, provider=self.provider, size=file_size,
1156 fields=['name', 'mediaLink', 'size'],
1157 tracker_callback=_DummyTrackerCallback)
1158 return self._RunOperation(_InnerUpload)
1159 finally:
1160 if fp:
1161 fp.close()
1162
1163 def Download(self, object_name, file_name=None, serialization_data=None,
1164 start_byte=0, end_byte=None):
1165 """Downloads an object from the test bucket.
1166
1167 Args:
1168 object_name: The name of the object (in the test bucket) to download.
1169 file_name: Optional file name to write downloaded data to. If None,
1170 downloaded data is discarded immediately.
1171 serialization_data: Optional serialization data, used so that we don't
1172 have to get the metadata before downloading.
1173 start_byte: The first byte in the object to download.
1174 (only should be specified for sliced downloads)
1175 end_byte: The last byte in the object to download.
1176 (only should be specified for sliced downloads)
1177 """
1178 fp = None
1179 try:
1180 if file_name is not None:
1181 fp = open(file_name, 'r+b')
1182 fp.seek(start_byte)
1183 else:
1184 fp = self.discard_sink
1185
1186 def _InnerDownload():
1187 self.gsutil_api.GetObjectMedia(
1188 self.bucket_url.bucket_name, object_name, fp,
1189 provider=self.provider, start_byte=start_byte, end_byte=end_byte,
1190 serialization_data=serialization_data)
1191 self._RunOperation(_InnerDownload)
1192 finally:
1193 if fp:
1194 fp.close()
1195
1196 def Delete(self, object_name):
1197 """Deletes an object from the test bucket.
1198
1199 Args:
1200 object_name: The name of the object to delete.
1201 """
1202 try:
1203 def _InnerDelete():
1204 self.gsutil_api.DeleteObject(self.bucket_url.bucket_name, object_name,
1205 provider=self.provider)
1206 self._RunOperation(_InnerDelete)
1207 except NotFoundException:
1208 pass
844 1209
845 def _GetDiskCounters(self): 1210 def _GetDiskCounters(self):
846 """Retrieves disk I/O statistics for all disks. 1211 """Retrieves disk I/O statistics for all disks.
847 1212
848 Adapted from the psutil module's psutil._pslinux.disk_io_counters: 1213 Adapted from the psutil module's psutil._pslinux.disk_io_counters:
849 http://code.google.com/p/psutil/source/browse/trunk/psutil/_pslinux.py 1214 http://code.google.com/p/psutil/source/browse/trunk/psutil/_pslinux.py
850 1215
851 Originally distributed under under a BSD license. 1216 Originally distributed under under a BSD license.
852 Original Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola. 1217 Original Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola.
853 1218
(...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after
959 self.logger.info('DNS lookups are disallowed in this environment, so ' 1324 self.logger.info('DNS lookups are disallowed in this environment, so '
960 'some information is not included in this perfdiag run.') 1325 'some information is not included in this perfdiag run.')
961 1326
962 # Get the local IP address from socket lib. 1327 # Get the local IP address from socket lib.
963 try: 1328 try:
964 sysinfo['ip_address'] = socket.gethostbyname(socket.gethostname()) 1329 sysinfo['ip_address'] = socket.gethostbyname(socket.gethostname())
965 except socket_errors: 1330 except socket_errors:
966 sysinfo['ip_address'] = '' 1331 sysinfo['ip_address'] = ''
967 # Record the temporary directory used since it can affect performance, e.g. 1332 # Record the temporary directory used since it can affect performance, e.g.
968 # when on a networked filesystem. 1333 # when on a networked filesystem.
969 sysinfo['tempdir'] = tempfile.gettempdir() 1334 sysinfo['tempdir'] = self.directory
970 1335
971 # Produces an RFC 2822 compliant GMT timestamp. 1336 # Produces an RFC 2822 compliant GMT timestamp.
972 sysinfo['gmt_timestamp'] = time.strftime('%a, %d %b %Y %H:%M:%S +0000', 1337 sysinfo['gmt_timestamp'] = time.strftime('%a, %d %b %Y %H:%M:%S +0000',
973 time.gmtime()) 1338 time.gmtime())
974 1339
975 # Execute a CNAME lookup on Google DNS to find what Google server 1340 # Execute a CNAME lookup on Google DNS to find what Google server
976 # it's routing to. 1341 # it's routing to.
977 cmd = ['nslookup', '-type=CNAME', self.XML_API_HOST] 1342 cmd = ['nslookup', '-type=CNAME', self.XML_API_HOST]
978 try: 1343 try:
979 nslookup_cname_output = self._Exec(cmd, return_output=True) 1344 nslookup_cname_output = self._Exec(cmd, return_output=True)
(...skipping 191 matching lines...) Expand 10 before | Expand all | Expand 10 after
1171 print 'Delete'.rjust(9), '', 1536 print 'Delete'.rjust(9), '',
1172 print MakeHumanReadable(numbytes).rjust(9), '', 1537 print MakeHumanReadable(numbytes).rjust(9), '',
1173 self._DisplayStats(trials) 1538 self._DisplayStats(trials)
1174 1539
1175 if 'write_throughput' in self.results: 1540 if 'write_throughput' in self.results:
1176 print 1541 print
1177 print '-' * 78 1542 print '-' * 78
1178 print 'Write Throughput'.center(78) 1543 print 'Write Throughput'.center(78)
1179 print '-' * 78 1544 print '-' * 78
1180 write_thru = self.results['write_throughput'] 1545 write_thru = self.results['write_throughput']
1181 print 'Copied a %s file %d times for a total transfer size of %s.' % ( 1546 print 'Copied %s %s file(s) for a total transfer size of %s.' % (
1547 self.num_objects,
1182 MakeHumanReadable(write_thru['file_size']), 1548 MakeHumanReadable(write_thru['file_size']),
1183 write_thru['num_copies'],
1184 MakeHumanReadable(write_thru['total_bytes_copied'])) 1549 MakeHumanReadable(write_thru['total_bytes_copied']))
1185 print 'Write throughput: %s/s.' % ( 1550 print 'Write throughput: %s/s.' % (
1186 MakeBitsHumanReadable(write_thru['bytes_per_second'] * 8)) 1551 MakeBitsHumanReadable(write_thru['bytes_per_second'] * 8))
1552 print 'Parallelism strategy: %s' % write_thru['parallelism']
1553
1554 if 'write_throughput_file' in self.results:
1555 print
1556 print '-' * 78
1557 print 'Write Throughput With File I/O'.center(78)
1558 print '-' * 78
1559 write_thru_file = self.results['write_throughput_file']
1560 print 'Copied %s %s file(s) for a total transfer size of %s.' % (
1561 self.num_objects,
1562 MakeHumanReadable(write_thru_file['file_size']),
1563 MakeHumanReadable(write_thru_file['total_bytes_copied']))
1564 print 'Write throughput: %s/s.' % (
1565 MakeBitsHumanReadable(write_thru_file['bytes_per_second'] * 8))
1566 print 'Parallelism strategy: %s' % write_thru_file['parallelism']
1187 1567
1188 if 'read_throughput' in self.results: 1568 if 'read_throughput' in self.results:
1189 print 1569 print
1190 print '-' * 78 1570 print '-' * 78
1191 print 'Read Throughput'.center(78) 1571 print 'Read Throughput'.center(78)
1192 print '-' * 78 1572 print '-' * 78
1193 read_thru = self.results['read_throughput'] 1573 read_thru = self.results['read_throughput']
1194 print 'Copied a %s file %d times for a total transfer size of %s.' % ( 1574 print 'Copied %s %s file(s) for a total transfer size of %s.' % (
1575 self.num_objects,
1195 MakeHumanReadable(read_thru['file_size']), 1576 MakeHumanReadable(read_thru['file_size']),
1196 read_thru['num_times'],
1197 MakeHumanReadable(read_thru['total_bytes_copied'])) 1577 MakeHumanReadable(read_thru['total_bytes_copied']))
1198 print 'Read throughput: %s/s.' % ( 1578 print 'Read throughput: %s/s.' % (
1199 MakeBitsHumanReadable(read_thru['bytes_per_second'] * 8)) 1579 MakeBitsHumanReadable(read_thru['bytes_per_second'] * 8))
1580 print 'Parallelism strategy: %s' % read_thru['parallelism']
1581
1582 if 'read_throughput_file' in self.results:
1583 print
1584 print '-' * 78
1585 print 'Read Throughput With File I/O'.center(78)
1586 print '-' * 78
1587 read_thru_file = self.results['read_throughput_file']
1588 print 'Copied %s %s file(s) for a total transfer size of %s.' % (
1589 self.num_objects,
1590 MakeHumanReadable(read_thru_file['file_size']),
1591 MakeHumanReadable(read_thru_file['total_bytes_copied']))
1592 print 'Read throughput: %s/s.' % (
1593 MakeBitsHumanReadable(read_thru_file['bytes_per_second'] * 8))
1594 print 'Parallelism strategy: %s' % read_thru_file['parallelism']
1200 1595
1201 if 'listing' in self.results: 1596 if 'listing' in self.results:
1202 print 1597 print
1203 print '-' * 78 1598 print '-' * 78
1204 print 'Listing'.center(78) 1599 print 'Listing'.center(78)
1205 print '-' * 78 1600 print '-' * 78
1206 1601
1207 listing = self.results['listing'] 1602 listing = self.results['listing']
1208 insert = listing['insert'] 1603 insert = listing['insert']
1209 delete = listing['delete'] 1604 delete = listing['delete']
(...skipping 172 matching lines...) Expand 10 before | Expand all | Expand 10 after
1382 val = int(val) 1777 val = int(val)
1383 if val < 1: 1778 if val < 1:
1384 raise CommandException(msg) 1779 raise CommandException(msg)
1385 return val 1780 return val
1386 except ValueError: 1781 except ValueError:
1387 raise CommandException(msg) 1782 raise CommandException(msg)
1388 1783
1389 def _ParseArgs(self): 1784 def _ParseArgs(self):
1390 """Parses arguments for perfdiag command.""" 1785 """Parses arguments for perfdiag command."""
1391 # From -n. 1786 # From -n.
1392 self.num_iterations = 5 1787 self.num_objects = 5
1393 # From -c. 1788 # From -c.
1394 self.processes = 1 1789 self.processes = 1
1395 # From -k. 1790 # From -k.
1396 self.threads = 1 1791 self.threads = 1
1792 # From -p
1793 self.parallel_strategy = None
1794 # From -y
1795 self.num_slices = 4
1397 # From -s. 1796 # From -s.
1398 self.thru_filesize = 1048576 1797 self.thru_filesize = 1048576
1798 # From -d.
1799 self.directory = tempfile.gettempdir()
1800 # Keep track of whether or not to delete the directory upon completion.
1801 self.delete_directory = False
1399 # From -t. 1802 # From -t.
1400 self.diag_tests = self.DEFAULT_DIAG_TESTS 1803 self.diag_tests = set(self.DEFAULT_DIAG_TESTS)
1401 # From -o. 1804 # From -o.
1402 self.output_file = None 1805 self.output_file = None
1403 # From -i. 1806 # From -i.
1404 self.input_file = None 1807 self.input_file = None
1405 # From -m. 1808 # From -m.
1406 self.metadata_keys = {} 1809 self.metadata_keys = {}
1407 1810
1408 if self.sub_opts: 1811 if self.sub_opts:
1409 for o, a in self.sub_opts: 1812 for o, a in self.sub_opts:
1410 if o == '-n': 1813 if o == '-n':
1411 self.num_iterations = self._ParsePositiveInteger( 1814 self.num_objects = self._ParsePositiveInteger(
1412 a, 'The -n parameter must be a positive integer.') 1815 a, 'The -n parameter must be a positive integer.')
1413 if o == '-c': 1816 if o == '-c':
1414 self.processes = self._ParsePositiveInteger( 1817 self.processes = self._ParsePositiveInteger(
1415 a, 'The -c parameter must be a positive integer.') 1818 a, 'The -c parameter must be a positive integer.')
1416 if o == '-k': 1819 if o == '-k':
1417 self.threads = self._ParsePositiveInteger( 1820 self.threads = self._ParsePositiveInteger(
1418 a, 'The -k parameter must be a positive integer.') 1821 a, 'The -k parameter must be a positive integer.')
1822 if o == '-p':
1823 if a.lower() in self.PARALLEL_STRATEGIES:
1824 self.parallel_strategy = a.lower()
1825 else:
1826 raise CommandException(
1827 "'%s' is not a valid parallelism strategy." % a)
1828 if o == '-y':
1829 self.num_slices = self._ParsePositiveInteger(
1830 a, 'The -y parameter must be a positive integer.')
1419 if o == '-s': 1831 if o == '-s':
1420 try: 1832 try:
1421 self.thru_filesize = HumanReadableToBytes(a) 1833 self.thru_filesize = HumanReadableToBytes(a)
1422 except ValueError: 1834 except ValueError:
1423 raise CommandException('Invalid -s parameter.') 1835 raise CommandException('Invalid -s parameter.')
1424 if self.thru_filesize > (20 * 1024 ** 3): # Max 20 GiB. 1836 if o == '-d':
1425 raise CommandException( 1837 self.directory = a
1426 'Maximum throughput file size parameter (-s) is 20 GiB.') 1838 if not os.path.exists(self.directory):
1839 self.delete_directory = True
1840 os.makedirs(self.directory)
1427 if o == '-t': 1841 if o == '-t':
1428 self.diag_tests = [] 1842 self.diag_tests = set()
1429 for test_name in a.strip().split(','): 1843 for test_name in a.strip().split(','):
1430 if test_name.lower() not in self.ALL_DIAG_TESTS: 1844 if test_name.lower() not in self.ALL_DIAG_TESTS:
1431 raise CommandException("List of test names (-t) contains invalid " 1845 raise CommandException("List of test names (-t) contains invalid "
1432 "test name '%s'." % test_name) 1846 "test name '%s'." % test_name)
1433 self.diag_tests.append(test_name) 1847 self.diag_tests.add(test_name)
1434 if o == '-m': 1848 if o == '-m':
1435 pieces = a.split(':') 1849 pieces = a.split(':')
1436 if len(pieces) != 2: 1850 if len(pieces) != 2:
1437 raise CommandException( 1851 raise CommandException(
1438 "Invalid metadata key-value combination '%s'." % a) 1852 "Invalid metadata key-value combination '%s'." % a)
1439 key, value = pieces 1853 key, value = pieces
1440 self.metadata_keys[key] = value 1854 self.metadata_keys[key] = value
1441 if o == '-o': 1855 if o == '-o':
1442 self.output_file = os.path.abspath(a) 1856 self.output_file = os.path.abspath(a)
1443 if o == '-i': 1857 if o == '-i':
1444 self.input_file = os.path.abspath(a) 1858 self.input_file = os.path.abspath(a)
1445 if not os.path.isfile(self.input_file): 1859 if not os.path.isfile(self.input_file):
1446 raise CommandException("Invalid input file (-i): '%s'." % a) 1860 raise CommandException("Invalid input file (-i): '%s'." % a)
1447 try: 1861 try:
1448 with open(self.input_file, 'r') as f: 1862 with open(self.input_file, 'r') as f:
1449 self.results = json.load(f) 1863 self.results = json.load(f)
1450 self.logger.info("Read input file: '%s'.", self.input_file) 1864 self.logger.info("Read input file: '%s'.", self.input_file)
1451 except ValueError: 1865 except ValueError:
1452 raise CommandException("Could not decode input file (-i): '%s'." % 1866 raise CommandException("Could not decode input file (-i): '%s'." %
1453 a) 1867 a)
1454 return 1868 return
1869
1870 # If parallelism is specified, default parallelism strategy to fan.
1871 if (self.processes > 1 or self.threads > 1) and not self.parallel_strategy:
1872 self.parallel_strategy = self.FAN
1873 elif self.processes == 1 and self.threads == 1 and self.parallel_strategy:
1874 raise CommandException(
1875 'Cannot specify parallelism strategy (-p) without also specifying '
1876 'multiple threads and/or processes (-c and/or -k).')
1877
1455 if not self.args: 1878 if not self.args:
1456 self.RaiseWrongNumberOfArgumentsException() 1879 self.RaiseWrongNumberOfArgumentsException()
1457 1880
1458 self.bucket_url = StorageUrlFromString(self.args[0]) 1881 self.bucket_url = StorageUrlFromString(self.args[0])
1459 self.provider = self.bucket_url.scheme 1882 self.provider = self.bucket_url.scheme
1460 if not (self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket()): 1883 if not self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket():
1461 raise CommandException('The perfdiag command requires a URL that ' 1884 raise CommandException('The perfdiag command requires a URL that '
1462 'specifies a bucket.\n"%s" is not ' 1885 'specifies a bucket.\n"%s" is not '
1463 'valid.' % self.args[0]) 1886 'valid.' % self.args[0])
1887
1888 if (self.thru_filesize > HumanReadableToBytes('2GiB') and
1889 (self.RTHRU in self.diag_tests or self.WTHRU in self.diag_tests)):
1890 raise CommandException(
1891 'For in-memory tests maximum file size is 2GiB. For larger file '
1892 'sizes, specify rthru_file and/or wthru_file with the -t option.')
1893
1894 perform_slice = self.parallel_strategy in (self.SLICE, self.BOTH)
1895 slice_not_available = (
1896 self.provider == 's3' and self.diag_tests.intersection(self.WTHRU,
1897 self.WTHRU_FILE))
1898 if perform_slice and slice_not_available:
1899 raise CommandException('Sliced uploads are not available for s3. '
1900 'Use -p fan or sequential uploads for s3.')
1901
1464 # Ensure the bucket exists. 1902 # Ensure the bucket exists.
1465 self.gsutil_api.GetBucket(self.bucket_url.bucket_name, 1903 self.gsutil_api.GetBucket(self.bucket_url.bucket_name,
1466 provider=self.bucket_url.scheme, 1904 provider=self.bucket_url.scheme,
1467 fields=['id']) 1905 fields=['id'])
1468 self.exceptions = [httplib.HTTPException, socket.error, socket.gaierror, 1906 self.exceptions = [httplib.HTTPException, socket.error, socket.gaierror,
1469 socket.timeout, httplib.BadStatusLine, 1907 socket.timeout, httplib.BadStatusLine,
1470 ServiceException] 1908 ServiceException]
1471 1909
1472 # Command entry point. 1910 # Command entry point.
1473 def RunCommand(self): 1911 def RunCommand(self):
1474 """Called by gsutil when the command is being invoked.""" 1912 """Called by gsutil when the command is being invoked."""
1475 self._ParseArgs() 1913 self._ParseArgs()
1476 1914
1477 if self.input_file: 1915 if self.input_file:
1478 self._DisplayResults() 1916 self._DisplayResults()
1479 return 0 1917 return 0
1480 1918
1481 # We turn off retries in the underlying boto library because the 1919 # We turn off retries in the underlying boto library because the
1482 # _RunOperation function handles errors manually so it can count them. 1920 # _RunOperation function handles errors manually so it can count them.
1483 boto.config.set('Boto', 'num_retries', '0') 1921 boto.config.set('Boto', 'num_retries', '0')
1484 1922
1485 self.logger.info( 1923 self.logger.info(
1486 'Number of iterations to run: %d\n' 1924 'Number of iterations to run: %d\n'
1487 'Base bucket URI: %s\n' 1925 'Base bucket URI: %s\n'
1488 'Number of processes: %d\n' 1926 'Number of processes: %d\n'
1489 'Number of threads: %d\n' 1927 'Number of threads: %d\n'
1928 'Parallelism strategy: %s\n'
1490 'Throughput file size: %s\n' 1929 'Throughput file size: %s\n'
1491 'Diagnostics to run: %s', 1930 'Diagnostics to run: %s',
1492 self.num_iterations, 1931 self.num_objects,
1493 self.bucket_url, 1932 self.bucket_url,
1494 self.processes, 1933 self.processes,
1495 self.threads, 1934 self.threads,
1935 self.parallel_strategy,
1496 MakeHumanReadable(self.thru_filesize), 1936 MakeHumanReadable(self.thru_filesize),
1497 (', '.join(self.diag_tests))) 1937 (', '.join(self.diag_tests)))
1498 1938
1499 try: 1939 try:
1500 self._SetUp() 1940 self._SetUp()
1501 1941
1502 # Collect generic system info. 1942 # Collect generic system info.
1503 self._CollectSysInfo() 1943 self._CollectSysInfo()
1504 # Collect netstat info and disk counters before tests (and again later). 1944 # Collect netstat info and disk counters before tests (and again later).
1505 netstat_output = self._GetTcpStats() 1945 netstat_output = self._GetTcpStats()
1506 if netstat_output: 1946 if netstat_output:
1507 self.results['sysinfo']['netstat_start'] = netstat_output 1947 self.results['sysinfo']['netstat_start'] = netstat_output
1508 if IS_LINUX: 1948 if IS_LINUX:
1509 self.results['sysinfo']['disk_counters_start'] = self._GetDiskCounters() 1949 self.results['sysinfo']['disk_counters_start'] = self._GetDiskCounters()
1510 # Record bucket URL. 1950 # Record bucket URL.
1511 self.results['bucket_uri'] = str(self.bucket_url) 1951 self.results['bucket_uri'] = str(self.bucket_url)
1512 self.results['json_format'] = 'perfdiag' 1952 self.results['json_format'] = 'perfdiag'
1513 self.results['metadata'] = self.metadata_keys 1953 self.results['metadata'] = self.metadata_keys
1514 1954
1515 if self.LAT in self.diag_tests: 1955 if self.LAT in self.diag_tests:
1516 self._RunLatencyTests() 1956 self._RunLatencyTests()
1517 if self.RTHRU in self.diag_tests: 1957 if self.RTHRU in self.diag_tests:
1518 self._RunReadThruTests() 1958 self._RunReadThruTests()
1959 # Run WTHRU_FILE before RTHRU_FILE. If data is created in WTHRU_FILE it
1960 # will be used in RTHRU_FILE to save time and bandwidth.
1961 if self.WTHRU_FILE in self.diag_tests:
1962 self._RunWriteThruTests(use_file=True)
1963 if self.RTHRU_FILE in self.diag_tests:
1964 self._RunReadThruTests(use_file=True)
1519 if self.WTHRU in self.diag_tests: 1965 if self.WTHRU in self.diag_tests:
1520 self._RunWriteThruTests() 1966 self._RunWriteThruTests()
1521 if self.LIST in self.diag_tests: 1967 if self.LIST in self.diag_tests:
1522 self._RunListTests() 1968 self._RunListTests()
1523 1969
1524 # Collect netstat info and disk counters after tests. 1970 # Collect netstat info and disk counters after tests.
1525 netstat_output = self._GetTcpStats() 1971 netstat_output = self._GetTcpStats()
1526 if netstat_output: 1972 if netstat_output:
1527 self.results['sysinfo']['netstat_end'] = netstat_output 1973 self.results['sysinfo']['netstat_end'] = netstat_output
1528 if IS_LINUX: 1974 if IS_LINUX:
1529 self.results['sysinfo']['disk_counters_end'] = self._GetDiskCounters() 1975 self.results['sysinfo']['disk_counters_end'] = self._GetDiskCounters()
1530 1976
1531 self.results['total_requests'] = self.total_requests 1977 self.results['total_requests'] = self.total_requests
1532 self.results['request_errors'] = self.request_errors 1978 self.results['request_errors'] = self.request_errors
1533 self.results['error_responses_by_code'] = self.error_responses_by_code 1979 self.results['error_responses_by_code'] = self.error_responses_by_code
1534 self.results['connection_breaks'] = self.connection_breaks 1980 self.results['connection_breaks'] = self.connection_breaks
1535 self.results['gsutil_version'] = gslib.VERSION 1981 self.results['gsutil_version'] = gslib.VERSION
1536 self.results['boto_version'] = boto.__version__ 1982 self.results['boto_version'] = boto.__version__
1537 1983
1984 self._TearDown()
1538 self._DisplayResults() 1985 self._DisplayResults()
1539 finally: 1986 finally:
1540 # TODO: Install signal handlers so this is performed in response to a 1987 # TODO: Install signal handlers so this is performed in response to a
1541 # terminating signal; consider multi-threaded object deletes during 1988 # terminating signal; consider multi-threaded object deletes during
1542 # cleanup so it happens quickly. 1989 # cleanup so it happens quickly.
1543 self._TearDown() 1990 self._TearDown()
1544 1991
1545 return 0 1992 return 0
1546 1993
1547 1994
1548 class UploadObjectTuple(object):
1549 """Picklable tuple with necessary metadata for an insert object call."""
1550
1551 def __init__(self, bucket_name, object_name, filepath=None, md5=None,
1552 contents=None):
1553 """Create an upload tuple.
1554
1555 Args:
1556 bucket_name: Name of the bucket to upload to.
1557 object_name: Name of the object to upload to.
1558 filepath: A file path located in self.file_contents and self.file_md5s.
1559 md5: The MD5 hash of the object being uploaded.
1560 contents: The contents of the file to be uploaded.
1561
1562 Note: (contents + md5) and filepath are mutually exlusive. You may specify
1563 one or the other, but not both.
1564 Note: If one of contents or md5 are specified, they must both be specified.
1565
1566 Raises:
1567 InvalidArgument: if the arguments are invalid.
1568 """
1569 self.bucket_name = bucket_name
1570 self.object_name = object_name
1571 self.filepath = filepath
1572 self.md5 = md5
1573 self.contents = contents
1574 if filepath and (md5 or contents is not None):
1575 raise InvalidArgument(
1576 'Only one of filepath or (md5 + contents) may be specified.')
1577 if not filepath and (not md5 or contents is None):
1578 raise InvalidArgument(
1579 'Both md5 and contents must be specified.')
1580
1581
1582 def StorageUrlToUploadObjectMetadata(storage_url): 1995 def StorageUrlToUploadObjectMetadata(storage_url):
1583 if storage_url.IsCloudUrl() and storage_url.IsObject(): 1996 if storage_url.IsCloudUrl() and storage_url.IsObject():
1584 upload_target = apitools_messages.Object() 1997 upload_target = apitools_messages.Object()
1585 upload_target.name = storage_url.object_name 1998 upload_target.name = storage_url.object_name
1586 upload_target.bucket = storage_url.bucket_name 1999 upload_target.bucket = storage_url.bucket_name
1587 return upload_target 2000 return upload_target
1588 else: 2001 else:
1589 raise CommandException('Non-cloud URL upload target %s was created in ' 2002 raise CommandException('Non-cloud URL upload target %s was created in '
1590 'perfdiag implemenation.' % storage_url) 2003 'perfdiag implemenation.' % storage_url)
OLDNEW
« no previous file with comments | « third_party/gsutil/gslib/commands/mb.py ('k') | third_party/gsutil/gslib/commands/rb.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698