OLD | NEW |
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 # Copyright 2012 Google Inc. All Rights Reserved. | 2 # Copyright 2012 Google Inc. All Rights Reserved. |
3 # | 3 # |
4 # Licensed under the Apache License, Version 2.0 (the "License"); | 4 # Licensed under the Apache License, Version 2.0 (the "License"); |
5 # you may not use this file except in compliance with the License. | 5 # you may not use this file except in compliance with the License. |
6 # You may obtain a copy of the License at | 6 # You may obtain a copy of the License at |
7 # | 7 # |
8 # http://www.apache.org/licenses/LICENSE-2.0 | 8 # http://www.apache.org/licenses/LICENSE-2.0 |
9 # | 9 # |
10 # Unless required by applicable law or agreed to in writing, software | 10 # Unless required by applicable law or agreed to in writing, software |
11 # distributed under the License is distributed on an "AS IS" BASIS, | 11 # distributed under the License is distributed on an "AS IS" BASIS, |
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 # See the License for the specific language governing permissions and | 13 # See the License for the specific language governing permissions and |
14 # limitations under the License. | 14 # limitations under the License. |
15 """Contains the perfdiag gsutil command.""" | 15 """Contains the perfdiag gsutil command.""" |
16 | 16 |
17 from __future__ import absolute_import | 17 from __future__ import absolute_import |
18 | 18 |
19 import calendar | 19 import calendar |
20 from collections import defaultdict | 20 from collections import defaultdict |
| 21 from collections import namedtuple |
21 import contextlib | 22 import contextlib |
22 import cStringIO | 23 import cStringIO |
23 import datetime | 24 import datetime |
24 import httplib | 25 import httplib |
25 import json | 26 import json |
26 import logging | 27 import logging |
27 import math | 28 import math |
28 import multiprocessing | 29 import multiprocessing |
29 import os | 30 import os |
30 import random | 31 import random |
31 import re | 32 import re |
32 import socket | 33 import socket |
33 import string | 34 import string |
34 import subprocess | 35 import subprocess |
35 import tempfile | 36 import tempfile |
36 import time | 37 import time |
37 | 38 |
38 import boto | 39 import boto |
39 import boto.gs.connection | 40 import boto.gs.connection |
40 | 41 |
41 import gslib | 42 import gslib |
42 from gslib.cloud_api import NotFoundException | 43 from gslib.cloud_api import NotFoundException |
43 from gslib.cloud_api import ServiceException | 44 from gslib.cloud_api import ServiceException |
44 from gslib.cloud_api_helper import GetDownloadSerializationDict | 45 from gslib.cloud_api_helper import GetDownloadSerializationData |
45 from gslib.command import Command | 46 from gslib.command import Command |
46 from gslib.command import DummyArgChecker | 47 from gslib.command import DummyArgChecker |
47 from gslib.command_argument import CommandArgument | 48 from gslib.command_argument import CommandArgument |
48 from gslib.commands import config | 49 from gslib.commands import config |
49 from gslib.cs_api_map import ApiSelector | 50 from gslib.cs_api_map import ApiSelector |
50 from gslib.exception import CommandException | 51 from gslib.exception import CommandException |
| 52 from gslib.file_part import FilePart |
51 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents | 53 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents |
52 from gslib.storage_url import StorageUrlFromString | 54 from gslib.storage_url import StorageUrlFromString |
53 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages | 55 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages |
| 56 from gslib.util import CheckFreeSpace |
| 57 from gslib.util import DivideAndCeil |
54 from gslib.util import GetCloudApiInstance | 58 from gslib.util import GetCloudApiInstance |
| 59 from gslib.util import GetFileSize |
55 from gslib.util import GetMaxRetryDelay | 60 from gslib.util import GetMaxRetryDelay |
56 from gslib.util import HumanReadableToBytes | 61 from gslib.util import HumanReadableToBytes |
57 from gslib.util import IS_LINUX | 62 from gslib.util import IS_LINUX |
58 from gslib.util import MakeBitsHumanReadable | 63 from gslib.util import MakeBitsHumanReadable |
59 from gslib.util import MakeHumanReadable | 64 from gslib.util import MakeHumanReadable |
60 from gslib.util import Percentile | 65 from gslib.util import Percentile |
61 from gslib.util import ResumableThreshold | 66 from gslib.util import ResumableThreshold |
62 | 67 |
63 | |
64 _SYNOPSIS = """ | 68 _SYNOPSIS = """ |
65 gsutil perfdiag [-i in.json] | 69 gsutil perfdiag [-i in.json] |
66 gsutil perfdiag [-o out.json] [-n iterations] [-c processes] | 70 gsutil perfdiag [-o out.json] [-n objects] [-c processes] |
67 [-k threads] [-s size] [-t tests] url... | 71 [-k threads] [-p parallelism type] [-y slices] [-s size] [-d directory] |
| 72 [-t tests] url... |
68 """ | 73 """ |
69 | 74 |
70 _DETAILED_HELP_TEXT = (""" | 75 _DETAILED_HELP_TEXT = (""" |
71 <B>SYNOPSIS</B> | 76 <B>SYNOPSIS</B> |
72 """ + _SYNOPSIS + """ | 77 """ + _SYNOPSIS + """ |
73 | 78 |
74 | 79 |
75 <B>DESCRIPTION</B> | 80 <B>DESCRIPTION</B> |
76 The perfdiag command runs a suite of diagnostic tests for a given Google | 81 The perfdiag command runs a suite of diagnostic tests for a given Google |
77 Storage bucket. | 82 Storage bucket. |
(...skipping 14 matching lines...) Expand all Loading... |
92 | 97 |
93 <B>PROVIDING DIAGNOSTIC OUTPUT TO GOOGLE CLOUD STORAGE TEAM</B> | 98 <B>PROVIDING DIAGNOSTIC OUTPUT TO GOOGLE CLOUD STORAGE TEAM</B> |
94 If the Google Cloud Storage Team asks you to run a performance diagnostic | 99 If the Google Cloud Storage Team asks you to run a performance diagnostic |
95 please use the following command, and email the output file (output.json) | 100 please use the following command, and email the output file (output.json) |
96 to gs-team@google.com: | 101 to gs-team@google.com: |
97 | 102 |
98 gsutil perfdiag -o output.json gs://your-bucket | 103 gsutil perfdiag -o output.json gs://your-bucket |
99 | 104 |
100 | 105 |
101 <B>OPTIONS</B> | 106 <B>OPTIONS</B> |
102 -n Sets the number of iterations performed when downloading and | 107 -n Sets the number of objects to use when downloading and uploading |
103 uploading files during latency and throughput tests. Defaults to | 108 files during tests. Defaults to 5. |
104 5. | |
105 | 109 |
106 -c Sets the number of processes to use while running throughput | 110 -c Sets the number of processes to use while running throughput |
107 experiments. The default value is 1. | 111 experiments. The default value is 1. |
108 | 112 |
109 -k Sets the number of threads per process to use while running | 113 -k Sets the number of threads per process to use while running |
110 throughput experiments. Each process will receive an equal number | 114 throughput experiments. Each process will receive an equal number |
111 of threads. The default value is 1. | 115 of threads. The default value is 1. |
112 | 116 |
113 -s Sets the size (in bytes) of the test file used to perform read | 117 Note: All specified threads and processes will be created, but may |
114 and write throughput tests. The default is 1 MiB. This can also | 118 not by saturated with work if too few objects (specified with -n) |
115 be specified using byte suffixes such as 500K or 1M. Note: these | 119 and too few components (specified with -y) are specified. |
116 values are interpreted as multiples of 1024 (K=1024, M=1024*1024, | 120 |
117 etc.) | 121 -p Sets the type of parallelism to be used (only applicable when |
| 122 threads or processes are specified and threads * processes > 1). |
| 123 The default is to use fan. Must be one of the following: |
| 124 |
| 125 fan |
| 126 Use one thread per object. This is akin to using gsutil -m cp, |
| 127 with sliced object download / parallel composite upload |
| 128 disabled. |
| 129 |
| 130 slice |
| 131 Use Y (specified with -y) threads for each object, transferring |
| 132 one object at a time. This is akin to using parallel object |
| 133 download / parallel composite upload, without -m. Sliced |
| 134 uploads not supported for s3. |
| 135 |
| 136 both |
| 137 Use Y (specified with -y) threads for each object, transferring |
| 138 multiple objects at a time. This is akin to simultaneously |
| 139 using sliced object download / parallel composite upload and |
| 140 gsutil -m cp. Sliced uploads not supported for s3. |
| 141 |
| 142 -y Sets the number of slices to divide each file/object into while |
| 143 transferring data. Only applicable with the slice (or both) |
| 144 parallelism type. The default is 4 slices. |
| 145 |
| 146 -s Sets the size (in bytes) for each of the N (set with -n) objects |
| 147 used in the read and write throughput tests. The default is 1 MiB. |
| 148 This can also be specified using byte suffixes such as 500K or 1M. |
| 149 Note: these values are interpreted as multiples of 1024 (K=1024, |
| 150 M=1024*1024, etc.) |
| 151 Note: If rthru_file or wthru_file are performed, N (set with -n) |
| 152 times as much disk space as specified will be required for the |
| 153 operation. |
| 154 |
| 155 -d Sets the directory to store temporary local files in. If not |
| 156 specified, a default temporary directory will be used. |
118 | 157 |
119 -t Sets the list of diagnostic tests to perform. The default is to | 158 -t Sets the list of diagnostic tests to perform. The default is to |
120 run all diagnostic tests. Must be a comma-separated list | 159 run the lat, rthru, and wthru diagnostic tests. Must be a |
121 containing one or more of the following: | 160 comma-separated list containing one or more of the following: |
122 | 161 |
123 lat | 162 lat |
124 Runs N iterations (set with -n) of writing the file, | 163 For N (set with -n) objects, write the object, retrieve its |
125 retrieving its metadata, reading the file, and deleting | 164 metadata, read the object, and finally delete the object. |
126 the file. Records the latency of each operation. | 165 Record the latency of each operation. |
127 | 166 |
128 list | 167 list |
129 Write N (set with -n) objects to the bucket, record how long | 168 Write N (set with -n) objects to the bucket, record how long |
130 it takes for the eventually consistent listing call to return | 169 it takes for the eventually consistent listing call to return |
131 the N objects in its result, delete the N objects, then record | 170 the N objects in its result, delete the N objects, then record |
132 how long it takes listing to stop returning the N objects. | 171 how long it takes listing to stop returning the N objects. |
133 This test is off by default. | 172 This test is off by default. |
134 | 173 |
135 rthru | 174 rthru |
136 Runs N (set with -n) read operations, with at most C | 175 Runs N (set with -n) read operations, with at most C |
137 (set with -c) reads outstanding at any given time. | 176 (set with -c) reads outstanding at any given time. |
138 | 177 |
| 178 rthru_file |
| 179 The same as rthru, but simultaneously writes data to the disk, |
| 180 to gauge the performance impact of the local disk on downloads. |
| 181 |
139 wthru | 182 wthru |
140 Runs N (set with -n) write operations, with at most C | 183 Runs N (set with -n) write operations, with at most C |
141 (set with -c) writes outstanding at any given time. | 184 (set with -c) writes outstanding at any given time. |
142 | 185 |
| 186 wthru_file |
| 187 The same as wthru, but simultaneously reads data from the disk, |
| 188 to gauge the performance impact of the local disk on uploads. |
| 189 |
143 -m Adds metadata to the result JSON file. Multiple -m values can be | 190 -m Adds metadata to the result JSON file. Multiple -m values can be |
144 specified. Example: | 191 specified. Example: |
145 | 192 |
146 gsutil perfdiag -m "key1:value1" -m "key2:value2" \ | 193 gsutil perfdiag -m "key1:val1" -m "key2:val2" gs://bucketname |
147 gs://bucketname/ | |
148 | 194 |
149 Each metadata key will be added to the top-level "metadata" | 195 Each metadata key will be added to the top-level "metadata" |
150 dictionary in the output JSON file. | 196 dictionary in the output JSON file. |
151 | 197 |
152 -o Writes the results of the diagnostic to an output file. The output | 198 -o Writes the results of the diagnostic to an output file. The output |
153 is a JSON file containing system information and performance | 199 is a JSON file containing system information and performance |
154 diagnostic results. The file can be read and reported later using | 200 diagnostic results. The file can be read and reported later using |
155 the -i option. | 201 the -i option. |
156 | 202 |
157 -i Reads the JSON output file created using the -o command and prints | 203 -i Reads the JSON output file created using the -o command and prints |
(...skipping 13 matching lines...) Expand all Loading... |
171 | 217 |
172 | 218 |
173 <B>NOTE</B> | 219 <B>NOTE</B> |
174 The perfdiag command collects system information. It collects your IP address, | 220 The perfdiag command collects system information. It collects your IP address, |
175 executes DNS queries to Google servers and collects the results, and collects | 221 executes DNS queries to Google servers and collects the results, and collects |
176 network statistics information from the output of netstat -s. It will also | 222 network statistics information from the output of netstat -s. It will also |
177 attempt to connect to your proxy server if you have one configured. None of | 223 attempt to connect to your proxy server if you have one configured. None of |
178 this information will be sent to Google unless you choose to send it. | 224 this information will be sent to Google unless you choose to send it. |
179 """) | 225 """) |
180 | 226 |
| 227 FileDataTuple = namedtuple( |
| 228 'FileDataTuple', |
| 229 'size md5 data') |
| 230 |
| 231 # Describes one object in a fanned download. If need_to_slice is specified as |
| 232 # True, the object should be downloaded with the slice strategy. Other field |
| 233 # names are the same as documented in PerfDiagCommand.Download. |
| 234 FanDownloadTuple = namedtuple( |
| 235 'FanDownloadTuple', |
| 236 'need_to_slice object_name file_name serialization_data') |
| 237 |
| 238 # Describes one slice in a sliced download. |
| 239 # Field names are the same as documented in PerfDiagCommand.Download. |
| 240 SliceDownloadTuple = namedtuple( |
| 241 'SliceDownloadTuple', |
| 242 'object_name file_name serialization_data start_byte end_byte') |
| 243 |
| 244 # Describes one file in a fanned upload. If need_to_slice is specified as |
| 245 # True, the file should be uploaded with the slice strategy. Other field |
| 246 # names are the same as documented in PerfDiagCommand.Upload. |
| 247 FanUploadTuple = namedtuple( |
| 248 'FanUploadTuple', |
| 249 'need_to_slice file_name object_name use_file') |
| 250 |
| 251 # Describes one slice in a sliced upload. |
| 252 # Field names are the same as documented in PerfDiagCommand.Upload. |
| 253 SliceUploadTuple = namedtuple( |
| 254 'SliceUploadTuple', |
| 255 'file_name object_name use_file file_start file_size') |
| 256 |
| 257 # Dict storing file_path:FileDataTuple for each temporary file used by |
| 258 # perfdiag. This data should be kept outside of the PerfDiagCommand class |
| 259 # since calls to Apply will make copies of all member data. |
| 260 temp_file_dict = {} |
| 261 |
181 | 262 |
182 class Error(Exception): | 263 class Error(Exception): |
183 """Base exception class for this module.""" | 264 """Base exception class for this module.""" |
184 pass | 265 pass |
185 | 266 |
186 | 267 |
187 class InvalidArgument(Error): | 268 class InvalidArgument(Error): |
188 """Raised on invalid arguments to functions.""" | 269 """Raised on invalid arguments to functions.""" |
189 pass | 270 pass |
190 | 271 |
191 | 272 |
192 def _DownloadWrapper(cls, arg, thread_state=None): | 273 def _DownloadObject(cls, args, thread_state=None): |
193 cls.Download(arg, thread_state=thread_state) | 274 """Function argument to apply for performing fanned parallel downloads. |
| 275 |
| 276 Args: |
| 277 cls: The calling PerfDiagCommand class instance. |
| 278 args: A FanDownloadTuple object describing this download. |
| 279 thread_state: gsutil Cloud API instance to use for the operation. |
| 280 """ |
| 281 cls.gsutil_api = GetCloudApiInstance(cls, thread_state) |
| 282 if args.need_to_slice: |
| 283 cls.PerformSlicedDownload(args.object_name, args.file_name, |
| 284 args.serialization_data) |
| 285 else: |
| 286 cls.Download(args.object_name, args.file_name, args.serialization_data) |
194 | 287 |
195 | 288 |
196 def _UploadWrapper(cls, arg, thread_state=None): | 289 def _DownloadSlice(cls, args, thread_state=None): |
197 cls.Upload(arg, thread_state=thread_state) | 290 """Function argument to apply for performing sliced downloads. |
| 291 |
| 292 Args: |
| 293 cls: The calling PerfDiagCommand class instance. |
| 294 args: A SliceDownloadTuple object describing this download. |
| 295 thread_state: gsutil Cloud API instance to use for the operation. |
| 296 """ |
| 297 cls.gsutil_api = GetCloudApiInstance(cls, thread_state) |
| 298 cls.Download(args.object_name, args.file_name, args.serialization_data, |
| 299 args.start_byte, args.end_byte) |
198 | 300 |
199 | 301 |
200 def _DeleteWrapper(cls, arg, thread_state=None): | 302 def _UploadObject(cls, args, thread_state=None): |
201 cls.Delete(arg, thread_state=thread_state) | 303 """Function argument to apply for performing fanned parallel uploads. |
| 304 |
| 305 Args: |
| 306 cls: The calling PerfDiagCommand class instance. |
| 307 args: A FanUploadTuple object describing this upload. |
| 308 thread_state: gsutil Cloud API instance to use for the operation. |
| 309 """ |
| 310 cls.gsutil_api = GetCloudApiInstance(cls, thread_state) |
| 311 if args.need_to_slice: |
| 312 cls.PerformSlicedUpload(args.file_name, args.object_name, args.use_file) |
| 313 else: |
| 314 cls.Upload(args.file_name, args.object_name, args.use_file) |
| 315 |
| 316 |
| 317 def _UploadSlice(cls, args, thread_state=None): |
| 318 """Function argument to apply for performing sliced parallel uploads. |
| 319 |
| 320 Args: |
| 321 cls: The calling PerfDiagCommand class instance. |
| 322 args: A SliceUploadTuple object describing this upload. |
| 323 thread_state: gsutil Cloud API instance to use for the operation. |
| 324 """ |
| 325 cls.gsutil_api = GetCloudApiInstance(cls, thread_state) |
| 326 cls.Upload(args.file_name, args.object_name, args.use_file, |
| 327 args.file_start, args.file_size) |
| 328 |
| 329 |
| 330 def _DeleteWrapper(cls, object_name, thread_state=None): |
| 331 """Function argument to apply for performing parallel object deletions. |
| 332 |
| 333 Args: |
| 334 cls: The calling PerfDiagCommand class instance. |
| 335 object_name: The object name to delete from the test bucket. |
| 336 thread_state: gsutil Cloud API instance to use for the operation. |
| 337 """ |
| 338 cls.gsutil_api = GetCloudApiInstance(cls, thread_state) |
| 339 cls.Delete(object_name) |
202 | 340 |
203 | 341 |
204 def _PerfdiagExceptionHandler(cls, e): | 342 def _PerfdiagExceptionHandler(cls, e): |
205 """Simple exception handler to allow post-completion status.""" | 343 """Simple exception handler to allow post-completion status.""" |
206 cls.logger.error(str(e)) | 344 cls.logger.error(str(e)) |
207 | 345 |
208 | 346 |
209 def _DummyTrackerCallback(_): | 347 def _DummyTrackerCallback(_): |
210 pass | 348 pass |
211 | 349 |
212 | 350 |
213 class DummyFile(object): | 351 class DummyFile(object): |
214 """A dummy, file-like object that throws away everything written to it.""" | 352 """A dummy, file-like object that throws away everything written to it.""" |
215 | 353 |
216 def write(self, *args, **kwargs): # pylint: disable=invalid-name | 354 def write(self, *args, **kwargs): # pylint: disable=invalid-name |
217 pass | 355 pass |
218 | 356 |
| 357 def close(self): # pylint: disable=invalid-name |
| 358 pass |
| 359 |
219 | 360 |
220 # Many functions in perfdiag re-define a temporary function based on a | 361 # Many functions in perfdiag re-define a temporary function based on a |
221 # variable from a loop, resulting in a false positive from the linter. | 362 # variable from a loop, resulting in a false positive from the linter. |
222 # pylint: disable=cell-var-from-loop | 363 # pylint: disable=cell-var-from-loop |
223 class PerfDiagCommand(Command): | 364 class PerfDiagCommand(Command): |
224 """Implementation of gsutil perfdiag command.""" | 365 """Implementation of gsutil perfdiag command.""" |
225 | 366 |
226 # Command specification. See base class for documentation. | 367 # Command specification. See base class for documentation. |
227 command_spec = Command.CreateCommandSpec( | 368 command_spec = Command.CreateCommandSpec( |
228 'perfdiag', | 369 'perfdiag', |
229 command_name_aliases=['diag', 'diagnostic', 'perf', 'performance'], | 370 command_name_aliases=['diag', 'diagnostic', 'perf', 'performance'], |
230 usage_synopsis=_SYNOPSIS, | 371 usage_synopsis=_SYNOPSIS, |
231 min_args=0, | 372 min_args=0, |
232 max_args=1, | 373 max_args=1, |
233 supported_sub_args='n:c:k:s:t:m:i:o:', | 374 supported_sub_args='n:c:k:p:y:s:d:t:m:i:o:', |
234 file_url_ok=False, | 375 file_url_ok=False, |
235 provider_url_ok=False, | 376 provider_url_ok=False, |
236 urls_start_arg=0, | 377 urls_start_arg=0, |
237 gs_api_support=[ApiSelector.XML, ApiSelector.JSON], | 378 gs_api_support=[ApiSelector.XML, ApiSelector.JSON], |
238 gs_default_api=ApiSelector.JSON, | 379 gs_default_api=ApiSelector.JSON, |
239 argparse_arguments=[ | 380 argparse_arguments=[ |
240 CommandArgument.MakeNCloudBucketURLsArgument(1) | 381 CommandArgument.MakeNCloudBucketURLsArgument(1) |
241 ] | 382 ] |
242 ) | 383 ) |
243 # Help specification. See help_provider.py for documentation. | 384 # Help specification. See help_provider.py for documentation. |
244 help_spec = Command.HelpSpec( | 385 help_spec = Command.HelpSpec( |
245 help_name='perfdiag', | 386 help_name='perfdiag', |
246 help_name_aliases=[], | 387 help_name_aliases=[], |
247 help_type='command_help', | 388 help_type='command_help', |
248 help_one_line_summary='Run performance diagnostic', | 389 help_one_line_summary='Run performance diagnostic', |
249 help_text=_DETAILED_HELP_TEXT, | 390 help_text=_DETAILED_HELP_TEXT, |
250 subcommand_help_text={}, | 391 subcommand_help_text={}, |
251 ) | 392 ) |
252 | 393 |
253 # Byte sizes to use for latency testing files. | 394 # Byte sizes to use for latency testing files. |
254 # TODO: Consider letting the user specify these sizes with a configuration | 395 # TODO: Consider letting the user specify these sizes with a configuration |
255 # parameter. | 396 # parameter. |
256 test_file_sizes = ( | 397 test_lat_file_sizes = ( |
257 0, # 0 bytes | 398 0, # 0 bytes |
258 1024, # 1 KiB | 399 1024, # 1 KiB |
259 102400, # 100 KiB | 400 102400, # 100 KiB |
260 1048576, # 1 MiB | 401 1048576, # 1 MiB |
261 ) | 402 ) |
262 | 403 |
263 # Test names. | 404 # Test names. |
264 RTHRU = 'rthru' | 405 RTHRU = 'rthru' |
| 406 RTHRU_FILE = 'rthru_file' |
265 WTHRU = 'wthru' | 407 WTHRU = 'wthru' |
| 408 WTHRU_FILE = 'wthru_file' |
266 LAT = 'lat' | 409 LAT = 'lat' |
267 LIST = 'list' | 410 LIST = 'list' |
268 | 411 |
| 412 # Parallelism strategies. |
| 413 FAN = 'fan' |
| 414 SLICE = 'slice' |
| 415 BOTH = 'both' |
| 416 |
269 # List of all diagnostic tests. | 417 # List of all diagnostic tests. |
270 ALL_DIAG_TESTS = (RTHRU, WTHRU, LAT, LIST) | 418 ALL_DIAG_TESTS = (RTHRU, RTHRU_FILE, WTHRU, WTHRU_FILE, LAT, LIST) |
| 419 |
271 # List of diagnostic tests to run by default. | 420 # List of diagnostic tests to run by default. |
272 DEFAULT_DIAG_TESTS = (RTHRU, WTHRU, LAT) | 421 DEFAULT_DIAG_TESTS = (RTHRU, WTHRU, LAT) |
273 | 422 |
| 423 # List of parallelism strategies. |
| 424 PARALLEL_STRATEGIES = (FAN, SLICE, BOTH) |
| 425 |
274 # Google Cloud Storage XML API endpoint host. | 426 # Google Cloud Storage XML API endpoint host. |
275 XML_API_HOST = boto.config.get( | 427 XML_API_HOST = boto.config.get( |
276 'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost) | 428 'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost) |
277 # Google Cloud Storage XML API endpoint port. | 429 # Google Cloud Storage XML API endpoint port. |
278 XML_API_PORT = boto.config.get('Credentials', 'gs_port', 80) | 430 XML_API_PORT = boto.config.get('Credentials', 'gs_port', 80) |
279 | 431 |
280 # Maximum number of times to retry requests on 5xx errors. | 432 # Maximum number of times to retry requests on 5xx errors. |
281 MAX_SERVER_ERROR_RETRIES = 5 | 433 MAX_SERVER_ERROR_RETRIES = 5 |
282 # Maximum number of times to retry requests on more serious errors like | 434 # Maximum number of times to retry requests on more serious errors like |
283 # the socket breaking. | 435 # the socket breaking. |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
317 """ | 469 """ |
318 self.logger.debug('Running command: %s', cmd) | 470 self.logger.debug('Running command: %s', cmd) |
319 stderr = subprocess.PIPE if mute_stderr else None | 471 stderr = subprocess.PIPE if mute_stderr else None |
320 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=stderr) | 472 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=stderr) |
321 (stdoutdata, _) = p.communicate() | 473 (stdoutdata, _) = p.communicate() |
322 if raise_on_error and p.returncode: | 474 if raise_on_error and p.returncode: |
323 raise CommandException("Received non-zero return code (%d) from " | 475 raise CommandException("Received non-zero return code (%d) from " |
324 "subprocess '%s'." % (p.returncode, ' '.join(cmd))) | 476 "subprocess '%s'." % (p.returncode, ' '.join(cmd))) |
325 return stdoutdata if return_output else p.returncode | 477 return stdoutdata if return_output else p.returncode |
326 | 478 |
| 479 def _WarnIfLargeData(self): |
| 480 """Outputs a warning message if a large amount of data is being used.""" |
| 481 if self.num_objects * self.thru_filesize > HumanReadableToBytes('2GiB'): |
| 482 self.logger.info('This is a large operation, and could take a while.') |
| 483 |
| 484 def _MakeTempFile(self, file_size=0, mem_metadata=False, |
| 485 mem_data=False, prefix='gsutil_test_file'): |
| 486 """Creates a temporary file of the given size and returns its path. |
| 487 |
| 488 Args: |
| 489 file_size: The size of the temporary file to create. |
| 490 mem_metadata: If true, store md5 and file size in memory at |
| 491 temp_file_dict[fpath].md5, tempfile_data[fpath].file_size. |
| 492 mem_data: If true, store the file data in memory at |
| 493 temp_file_dict[fpath].data |
| 494 prefix: The prefix to use for the temporary file. Defaults to |
| 495 gsutil_test_file. |
| 496 |
| 497 Returns: |
| 498 The file path of the created temporary file. |
| 499 """ |
| 500 fd, fpath = tempfile.mkstemp(suffix='.bin', prefix=prefix, |
| 501 dir=self.directory, text=False) |
| 502 with os.fdopen(fd, 'wb') as fp: |
| 503 random_bytes = os.urandom(min(file_size, |
| 504 self.MAX_UNIQUE_RANDOM_BYTES)) |
| 505 total_bytes_written = 0 |
| 506 while total_bytes_written < file_size: |
| 507 num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES, |
| 508 file_size - total_bytes_written) |
| 509 fp.write(random_bytes[:num_bytes]) |
| 510 total_bytes_written += num_bytes |
| 511 |
| 512 if mem_metadata or mem_data: |
| 513 with open(fpath, 'rb') as fp: |
| 514 file_size = GetFileSize(fp) if mem_metadata else None |
| 515 md5 = CalculateB64EncodedMd5FromContents(fp) if mem_metadata else None |
| 516 data = fp.read() if mem_data else None |
| 517 temp_file_dict[fpath] = FileDataTuple(file_size, md5, data) |
| 518 |
| 519 self.temporary_files.add(fpath) |
| 520 return fpath |
| 521 |
327 def _SetUp(self): | 522 def _SetUp(self): |
328 """Performs setup operations needed before diagnostics can be run.""" | 523 """Performs setup operations needed before diagnostics can be run.""" |
329 | 524 |
330 # Stores test result data. | 525 # Stores test result data. |
331 self.results = {} | 526 self.results = {} |
332 # List of test files in a temporary location on disk for latency ops. | 527 # Set of file paths for local temporary files. |
333 self.latency_files = [] | 528 self.temporary_files = set() |
334 # List of test objects to clean up in the test bucket. | 529 # Set of names for test objects that exist in the test bucket. |
335 self.test_object_names = set() | 530 self.temporary_objects = set() |
336 # Maps each test file path to its size in bytes. | |
337 self.file_sizes = {} | |
338 # Maps each test file to its contents as a string. | |
339 self.file_contents = {} | |
340 # Maps each test file to its MD5 hash. | |
341 self.file_md5s = {} | |
342 # Total number of HTTP requests made. | 531 # Total number of HTTP requests made. |
343 self.total_requests = 0 | 532 self.total_requests = 0 |
344 # Total number of HTTP 5xx errors. | 533 # Total number of HTTP 5xx errors. |
345 self.request_errors = 0 | 534 self.request_errors = 0 |
346 # Number of responses, keyed by response code. | 535 # Number of responses, keyed by response code. |
347 self.error_responses_by_code = defaultdict(int) | 536 self.error_responses_by_code = defaultdict(int) |
348 # Total number of socket errors. | 537 # Total number of socket errors. |
349 self.connection_breaks = 0 | 538 self.connection_breaks = 0 |
| 539 # Boolean to prevent doing cleanup twice. |
| 540 self.teardown_completed = False |
350 | 541 |
351 def _MakeFile(file_size): | 542 # Create files for latency test. |
352 """Creates a temporary file of the given size and returns its path.""" | 543 if self.LAT in self.diag_tests: |
353 fd, fpath = tempfile.mkstemp(suffix='.bin', prefix='gsutil_test_file', | 544 self.latency_files = [] |
354 text=False) | 545 for file_size in self.test_lat_file_sizes: |
355 self.file_sizes[fpath] = file_size | 546 fpath = self._MakeTempFile(file_size, mem_metadata=True, mem_data=True) |
356 random_bytes = os.urandom(min(file_size, self.MAX_UNIQUE_RANDOM_BYTES)) | 547 self.latency_files.append(fpath) |
357 total_bytes = 0 | |
358 file_contents = '' | |
359 while total_bytes < file_size: | |
360 num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES, file_size - total_bytes) | |
361 file_contents += random_bytes[:num_bytes] | |
362 total_bytes += num_bytes | |
363 self.file_contents[fpath] = file_contents | |
364 with os.fdopen(fd, 'wb') as f: | |
365 f.write(self.file_contents[fpath]) | |
366 with open(fpath, 'rb') as f: | |
367 self.file_md5s[fpath] = CalculateB64EncodedMd5FromContents(f) | |
368 return fpath | |
369 | 548 |
370 # Create files for latency tests. | 549 # Create files for throughput tests. |
371 for file_size in self.test_file_sizes: | 550 if self.diag_tests.intersection( |
372 fpath = _MakeFile(file_size) | 551 (self.RTHRU, self.WTHRU, self.RTHRU_FILE, self.WTHRU_FILE)): |
373 self.latency_files.append(fpath) | 552 # Create a file for warming up the TCP connection. |
| 553 self.tcp_warmup_file = self._MakeTempFile( |
| 554 5 * 1024 * 1024, mem_metadata=True, mem_data=True) |
374 | 555 |
375 # Creating a file for warming up the TCP connection. | 556 # For in memory tests, throughput tests transfer the same object N times |
376 self.tcp_warmup_file = _MakeFile(5 * 1024 * 1024) # 5 Mebibytes. | 557 # instead of creating N objects, in order to avoid excessive memory usage. |
377 # Remote file to use for TCP warmup. | 558 if self.diag_tests.intersection((self.RTHRU, self.WTHRU)): |
378 self.tcp_warmup_remote_file = (str(self.bucket_url) + | 559 self.mem_thru_file_name = self._MakeTempFile( |
379 os.path.basename(self.tcp_warmup_file)) | 560 self.thru_filesize, mem_metadata=True, mem_data=True) |
| 561 self.mem_thru_object_name = os.path.basename(self.mem_thru_file_name) |
380 | 562 |
381 # Local file on disk for write throughput tests. | 563 # For tests that use disk I/O, it is necessary to create N objects in |
382 self.thru_local_file = _MakeFile(self.thru_filesize) | 564 # in order to properly measure the performance impact of seeks. |
| 565 if self.diag_tests.intersection((self.RTHRU_FILE, self.WTHRU_FILE)): |
| 566 # List of file names and corresponding object names to use for file |
| 567 # throughput tests. |
| 568 self.thru_file_names = [] |
| 569 self.thru_object_names = [] |
| 570 |
| 571 free_disk_space = CheckFreeSpace(self.directory) |
| 572 if free_disk_space >= self.thru_filesize * self.num_objects: |
| 573 self.logger.info('\nCreating %d local files each of size %s.' |
| 574 % (self.num_objects, |
| 575 MakeHumanReadable(self.thru_filesize))) |
| 576 self._WarnIfLargeData() |
| 577 for _ in range(self.num_objects): |
| 578 file_name = self._MakeTempFile(self.thru_filesize, |
| 579 mem_metadata=True) |
| 580 self.thru_file_names.append(file_name) |
| 581 self.thru_object_names.append(os.path.basename(file_name)) |
| 582 else: |
| 583 raise CommandException( |
| 584 'Not enough free disk space for throughput files: ' |
| 585 '%s of disk space required, but only %s available.' |
| 586 % (MakeHumanReadable(self.thru_filesize * self.num_objects), |
| 587 MakeHumanReadable(free_disk_space))) |
383 | 588 |
384 # Dummy file buffer to use for downloading that goes nowhere. | 589 # Dummy file buffer to use for downloading that goes nowhere. |
385 self.discard_sink = DummyFile() | 590 self.discard_sink = DummyFile() |
386 | 591 |
| 592 # Filter out misleading progress callback output and the incorrect |
| 593 # suggestion to use gsutil -m perfdiag. |
| 594 self.logger.addFilter(self._PerfdiagFilter()) |
| 595 |
387 def _TearDown(self): | 596 def _TearDown(self): |
388 """Performs operations to clean things up after performing diagnostics.""" | 597 """Performs operations to clean things up after performing diagnostics.""" |
389 for fpath in self.latency_files + [self.thru_local_file, | 598 if not self.teardown_completed: |
390 self.tcp_warmup_file]: | 599 temp_file_dict.clear() |
| 600 |
391 try: | 601 try: |
392 os.remove(fpath) | 602 for fpath in self.temporary_files: |
| 603 os.remove(fpath) |
| 604 if self.delete_directory: |
| 605 os.rmdir(self.directory) |
393 except OSError: | 606 except OSError: |
394 pass | 607 pass |
395 | 608 |
396 for object_name in self.test_object_names: | 609 if self.threads > 1 or self.processes > 1: |
397 | 610 args = [obj for obj in self.temporary_objects] |
398 def _Delete(): | 611 self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler, |
399 try: | 612 arg_checker=DummyArgChecker, |
400 self.gsutil_api.DeleteObject(self.bucket_url.bucket_name, | 613 parallel_operations_override=True, |
401 object_name, | 614 process_count=self.processes, thread_count=self.threads) |
402 provider=self.provider) | 615 else: |
403 except NotFoundException: | 616 for object_name in self.temporary_objects: |
404 pass | 617 self.Delete(object_name) |
405 | 618 self.teardown_completed = True |
406 self._RunOperation(_Delete) | |
407 | 619 |
408 @contextlib.contextmanager | 620 @contextlib.contextmanager |
409 def _Time(self, key, bucket): | 621 def _Time(self, key, bucket): |
410 """A context manager that measures time. | 622 """A context manager that measures time. |
411 | 623 |
412 A context manager that prints a status message before and after executing | 624 A context manager that prints a status message before and after executing |
413 the inner command and times how long the inner command takes. Keeps track of | 625 the inner command and times how long the inner command takes. Keeps track of |
414 the timing, aggregated by the given key. | 626 the timing, aggregated by the given key. |
415 | 627 |
416 Args: | 628 Args: |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
471 break | 683 break |
472 else: | 684 else: |
473 self.connection_breaks += 1 | 685 self.connection_breaks += 1 |
474 return return_val | 686 return return_val |
475 | 687 |
476 def _RunLatencyTests(self): | 688 def _RunLatencyTests(self): |
477 """Runs latency tests.""" | 689 """Runs latency tests.""" |
478 # Stores timing information for each category of operation. | 690 # Stores timing information for each category of operation. |
479 self.results['latency'] = defaultdict(list) | 691 self.results['latency'] = defaultdict(list) |
480 | 692 |
481 for i in range(self.num_iterations): | 693 for i in range(self.num_objects): |
482 self.logger.info('\nRunning latency iteration %d...', i+1) | 694 self.logger.info('\nRunning latency iteration %d...', i+1) |
483 for fpath in self.latency_files: | 695 for fpath in self.latency_files: |
| 696 file_data = temp_file_dict[fpath] |
484 url = self.bucket_url.Clone() | 697 url = self.bucket_url.Clone() |
485 url.object_name = os.path.basename(fpath) | 698 url.object_name = os.path.basename(fpath) |
486 file_size = self.file_sizes[fpath] | 699 file_size = file_data.size |
487 readable_file_size = MakeHumanReadable(file_size) | 700 readable_file_size = MakeHumanReadable(file_size) |
488 | 701 |
489 self.logger.info( | 702 self.logger.info( |
490 "\nFile of size %s located on disk at '%s' being diagnosed in the " | 703 "\nFile of size %s located on disk at '%s' being diagnosed in the " |
491 "cloud at '%s'.", readable_file_size, fpath, url) | 704 "cloud at '%s'.", readable_file_size, fpath, url) |
492 | 705 |
493 upload_target = StorageUrlToUploadObjectMetadata(url) | 706 upload_target = StorageUrlToUploadObjectMetadata(url) |
494 | 707 |
495 def _Upload(): | 708 def _Upload(): |
496 io_fp = cStringIO.StringIO(self.file_contents[fpath]) | 709 io_fp = cStringIO.StringIO(file_data.data) |
497 with self._Time('UPLOAD_%d' % file_size, self.results['latency']): | 710 with self._Time('UPLOAD_%d' % file_size, self.results['latency']): |
498 self.gsutil_api.UploadObject( | 711 self.gsutil_api.UploadObject( |
499 io_fp, upload_target, size=file_size, provider=self.provider, | 712 io_fp, upload_target, size=file_size, provider=self.provider, |
500 fields=['name']) | 713 fields=['name']) |
501 self._RunOperation(_Upload) | 714 self._RunOperation(_Upload) |
502 | 715 |
503 def _Metadata(): | 716 def _Metadata(): |
504 with self._Time('METADATA_%d' % file_size, self.results['latency']): | 717 with self._Time('METADATA_%d' % file_size, self.results['latency']): |
505 return self.gsutil_api.GetObjectMetadata( | 718 return self.gsutil_api.GetObjectMetadata( |
506 url.bucket_name, url.object_name, | 719 url.bucket_name, url.object_name, |
507 provider=self.provider, fields=['name', 'contentType', | 720 provider=self.provider, fields=['name', 'contentType', |
508 'mediaLink', 'size']) | 721 'mediaLink', 'size']) |
509 # Download will get the metadata first if we don't pass it in. | 722 # Download will get the metadata first if we don't pass it in. |
510 download_metadata = self._RunOperation(_Metadata) | 723 download_metadata = self._RunOperation(_Metadata) |
511 serialization_dict = GetDownloadSerializationDict(download_metadata) | 724 serialization_data = GetDownloadSerializationData(download_metadata) |
512 serialization_data = json.dumps(serialization_dict) | |
513 | 725 |
514 def _Download(): | 726 def _Download(): |
515 with self._Time('DOWNLOAD_%d' % file_size, self.results['latency']): | 727 with self._Time('DOWNLOAD_%d' % file_size, self.results['latency']): |
516 self.gsutil_api.GetObjectMedia( | 728 self.gsutil_api.GetObjectMedia( |
517 url.bucket_name, url.object_name, self.discard_sink, | 729 url.bucket_name, url.object_name, self.discard_sink, |
518 provider=self.provider, serialization_data=serialization_data) | 730 provider=self.provider, serialization_data=serialization_data) |
519 self._RunOperation(_Download) | 731 self._RunOperation(_Download) |
520 | 732 |
521 def _Delete(): | 733 def _Delete(): |
522 with self._Time('DELETE_%d' % file_size, self.results['latency']): | 734 with self._Time('DELETE_%d' % file_size, self.results['latency']): |
523 self.gsutil_api.DeleteObject(url.bucket_name, url.object_name, | 735 self.gsutil_api.DeleteObject(url.bucket_name, url.object_name, |
524 provider=self.provider) | 736 provider=self.provider) |
525 self._RunOperation(_Delete) | 737 self._RunOperation(_Delete) |
526 | 738 |
527 class _CpFilter(logging.Filter): | 739 class _PerfdiagFilter(logging.Filter): |
528 | 740 |
529 def filter(self, record): | 741 def filter(self, record): |
530 # Used to prevent cp._LogCopyOperation from spewing output from | 742 # Used to prevent unnecessary output when using multiprocessing. |
531 # subprocesses about every iteration. | |
532 msg = record.getMessage() | 743 msg = record.getMessage() |
533 return not (('Copying file:///' in msg) or ('Copying gs://' in msg) or | 744 return not (('Copying file:///' in msg) or ('Copying gs://' in msg) or |
534 ('Computing CRC' in msg)) | 745 ('Computing CRC' in msg) or ('gsutil -m perfdiag' in msg)) |
535 | 746 |
536 def _PerfdiagExceptionHandler(self, e): | 747 def _PerfdiagExceptionHandler(self, e): |
537 """Simple exception handler to allow post-completion status.""" | 748 """Simple exception handler to allow post-completion status.""" |
538 self.logger.error(str(e)) | 749 self.logger.error(str(e)) |
539 | 750 |
540 def _RunReadThruTests(self): | 751 def PerformFannedDownload(self, need_to_slice, object_names, file_names, |
| 752 serialization_data): |
| 753 """Performs a parallel download of multiple objects using the fan strategy. |
| 754 |
| 755 Args: |
| 756 need_to_slice: If True, additionally apply the slice strategy to each |
| 757 object in object_names. |
| 758 object_names: A list of object names to be downloaded. Each object must |
| 759 already exist in the test bucket. |
| 760 file_names: A list, corresponding by index to object_names, of file names |
| 761 for downloaded data. If None, discard downloaded data. |
| 762 serialization_data: A list, corresponding by index to object_names, |
| 763 of serialization data for each object. |
| 764 """ |
| 765 args = [] |
| 766 for i in range(len(object_names)): |
| 767 file_name = file_names[i] if file_names else None |
| 768 args.append(FanDownloadTuple( |
| 769 need_to_slice, object_names[i], file_name, |
| 770 serialization_data[i])) |
| 771 self.Apply(_DownloadObject, args, _PerfdiagExceptionHandler, |
| 772 ('total_requests', 'request_errors'), |
| 773 arg_checker=DummyArgChecker, parallel_operations_override=True, |
| 774 process_count=self.processes, thread_count=self.threads) |
| 775 |
| 776 def PerformSlicedDownload(self, object_name, file_name, serialization_data): |
| 777 """Performs a download of an object using the slice strategy. |
| 778 |
| 779 Args: |
| 780 object_name: The name of the object to download. |
| 781 file_name: The name of the file to download data to, or None if data |
| 782 should be discarded. |
| 783 serialization_data: The serialization data for the object. |
| 784 """ |
| 785 if file_name: |
| 786 with open(file_name, 'ab') as fp: |
| 787 fp.truncate(self.thru_filesize) |
| 788 component_size = DivideAndCeil(self.thru_filesize, self.num_slices) |
| 789 args = [] |
| 790 for i in range(self.num_slices): |
| 791 start_byte = i * component_size |
| 792 end_byte = min((i + 1) * (component_size) - 1, self.thru_filesize - 1) |
| 793 args.append(SliceDownloadTuple(object_name, file_name, serialization_data, |
| 794 start_byte, end_byte)) |
| 795 self.Apply(_DownloadSlice, args, _PerfdiagExceptionHandler, |
| 796 ('total_requests', 'request_errors'), |
| 797 arg_checker=DummyArgChecker, parallel_operations_override=True, |
| 798 process_count=self.processes, thread_count=self.threads) |
| 799 |
| 800 def PerformFannedUpload(self, need_to_slice, file_names, object_names, |
| 801 use_file): |
| 802 """Performs a parallel upload of multiple files using the fan strategy. |
| 803 |
| 804 The metadata for file_name should be present in temp_file_dict prior |
| 805 to calling. Also, the data for file_name should be present in temp_file_dict |
| 806 if use_file is specified as False. |
| 807 |
| 808 Args: |
| 809 need_to_slice: If True, additionally apply the slice strategy to each |
| 810 file in file_names. |
| 811 file_names: A list of file names to be uploaded. |
| 812 object_names: A list, corresponding by by index to file_names, of object |
| 813 names to upload data to. |
| 814 use_file: If true, use disk I/O, otherwise read upload data from memory. |
| 815 """ |
| 816 args = [] |
| 817 for i in range(len(file_names)): |
| 818 args.append(FanUploadTuple( |
| 819 need_to_slice, file_names[i], object_names[i], use_file)) |
| 820 self.Apply(_UploadObject, args, _PerfdiagExceptionHandler, |
| 821 ('total_requests', 'request_errors'), |
| 822 arg_checker=DummyArgChecker, parallel_operations_override=True, |
| 823 process_count=self.processes, thread_count=self.threads) |
| 824 |
| 825 def PerformSlicedUpload(self, file_name, object_name, use_file): |
| 826 """Performs a parallel upload of a file using the slice strategy. |
| 827 |
| 828 The metadata for file_name should be present in temp_file_dict prior |
| 829 to calling. Also, the data from for file_name should be present in |
| 830 temp_file_dict if use_file is specified as False. |
| 831 |
| 832 Args: |
| 833 file_name: The name of the file to upload. |
| 834 object_name: The name of the object to upload to. |
| 835 use_file: If true, use disk I/O, otherwise read upload data from memory. |
| 836 """ |
| 837 # Divide the file into components. |
| 838 component_size = DivideAndCeil(self.thru_filesize, self.num_slices) |
| 839 component_object_names = ( |
| 840 [object_name + str(i) for i in range(self.num_slices)]) |
| 841 |
| 842 args = [] |
| 843 for i in range(self.num_slices): |
| 844 component_start = i * component_size |
| 845 component_size = min(component_size, |
| 846 temp_file_dict[file_name].size - component_start) |
| 847 args.append(SliceUploadTuple(file_name, component_object_names[i], |
| 848 use_file, component_start, component_size)) |
| 849 |
| 850 # Upload the components in parallel. |
| 851 try: |
| 852 self.Apply(_UploadSlice, args, _PerfdiagExceptionHandler, |
| 853 ('total_requests', 'request_errors'), |
| 854 arg_checker=DummyArgChecker, parallel_operations_override=True, |
| 855 process_count=self.processes, thread_count=self.threads) |
| 856 |
| 857 # Compose the components into an object. |
| 858 request_components = [] |
| 859 for i in range(self.num_slices): |
| 860 src_obj_metadata = ( |
| 861 apitools_messages.ComposeRequest.SourceObjectsValueListEntry( |
| 862 name=component_object_names[i])) |
| 863 request_components.append(src_obj_metadata) |
| 864 |
| 865 dst_obj_metadata = apitools_messages.Object() |
| 866 dst_obj_metadata.name = object_name |
| 867 dst_obj_metadata.bucket = self.bucket_url.bucket_name |
| 868 def _Compose(): |
| 869 self.gsutil_api.ComposeObject(request_components, dst_obj_metadata, |
| 870 provider=self.provider) |
| 871 self._RunOperation(_Compose) |
| 872 finally: |
| 873 # Delete the temporary components. |
| 874 self.Apply(_DeleteWrapper, component_object_names, |
| 875 _PerfdiagExceptionHandler, |
| 876 ('total_requests', 'request_errors'), |
| 877 arg_checker=DummyArgChecker, parallel_operations_override=True, |
| 878 process_count=self.processes, thread_count=self.threads) |
| 879 |
| 880 def _RunReadThruTests(self, use_file=False): |
541 """Runs read throughput tests.""" | 881 """Runs read throughput tests.""" |
| 882 test_name = 'read_throughput_file' if use_file else 'read_throughput' |
| 883 file_io_string = 'with file I/O' if use_file else '' |
542 self.logger.info( | 884 self.logger.info( |
543 '\nRunning read throughput tests (%s iterations of size %s)' % | 885 '\nRunning read throughput tests %s (%s objects of size %s)' % |
544 (self.num_iterations, MakeHumanReadable(self.thru_filesize))) | 886 (file_io_string, self.num_objects, |
545 | 887 MakeHumanReadable(self.thru_filesize))) |
546 self.results['read_throughput'] = {'file_size': self.thru_filesize, | 888 self._WarnIfLargeData() |
547 'num_times': self.num_iterations, | 889 |
548 'processes': self.processes, | 890 self.results[test_name] = {'file_size': self.thru_filesize, |
549 'threads': self.threads} | 891 'processes': self.processes, |
550 | 892 'threads': self.threads, |
551 # Copy the TCP warmup file. | 893 'parallelism': self.parallel_strategy |
552 warmup_url = self.bucket_url.Clone() | 894 } |
553 warmup_url.object_name = os.path.basename(self.tcp_warmup_file) | 895 |
554 warmup_target = StorageUrlToUploadObjectMetadata(warmup_url) | 896 # Copy the file(s) to the test bucket, and also get the serialization data |
555 self.test_object_names.add(warmup_url.object_name) | 897 # so that we can pass it to download. |
556 | 898 if use_file: |
557 def _Upload1(): | 899 # For test with file I/O use N files on disk to preserve seek performance. |
558 self.gsutil_api.UploadObject( | 900 file_names = self.thru_file_names |
559 cStringIO.StringIO(self.file_contents[self.tcp_warmup_file]), | 901 object_names = self.thru_object_names |
560 warmup_target, provider=self.provider, fields=['name']) | 902 serialization_data = [] |
561 self._RunOperation(_Upload1) | 903 for i in range(self.num_objects): |
562 | 904 self.temporary_objects.add(self.thru_object_names[i]) |
563 # Copy the file to remote location before reading. | 905 if self.WTHRU_FILE in self.diag_tests: |
564 thru_url = self.bucket_url.Clone() | 906 # If we ran the WTHRU_FILE test, then the objects already exist. |
565 thru_url.object_name = os.path.basename(self.thru_local_file) | 907 obj_metadata = self.gsutil_api.GetObjectMetadata( |
566 thru_target = StorageUrlToUploadObjectMetadata(thru_url) | 908 self.bucket_url.bucket_name, self.thru_object_names[i], |
567 thru_target.md5Hash = self.file_md5s[self.thru_local_file] | 909 fields=['size', 'mediaLink'], provider=self.bucket_url.scheme) |
568 self.test_object_names.add(thru_url.object_name) | 910 else: |
569 | 911 obj_metadata = self.Upload(self.thru_file_names[i], |
570 # Get the mediaLink here so that we can pass it to download. | 912 self.thru_object_names[i], use_file) |
571 def _Upload2(): | 913 |
572 return self.gsutil_api.UploadObject( | 914 # File overwrite causes performance issues with sliced downloads. |
573 cStringIO.StringIO(self.file_contents[self.thru_local_file]), | 915 # Delete the file and reopen it for download. This matches what a real |
574 thru_target, provider=self.provider, size=self.thru_filesize, | 916 # download would look like. |
575 fields=['name', 'mediaLink', 'size']) | 917 os.unlink(self.thru_file_names[i]) |
576 | 918 open(self.thru_file_names[i], 'ab').close() |
577 # Get the metadata for the object so that we are just measuring performance | 919 serialization_data.append(GetDownloadSerializationData(obj_metadata)) |
578 # on the actual bytes transfer. | 920 else: |
579 download_metadata = self._RunOperation(_Upload2) | 921 # For in-memory test only use one file but copy it num_objects times, to |
580 serialization_dict = GetDownloadSerializationDict(download_metadata) | 922 # allow scalability in num_objects. |
581 serialization_data = json.dumps(serialization_dict) | 923 self.temporary_objects.add(self.mem_thru_object_name) |
582 | 924 obj_metadata = self.Upload(self.mem_thru_file_name, |
| 925 self.mem_thru_object_name, use_file) |
| 926 file_names = None |
| 927 object_names = [self.mem_thru_object_name] * self.num_objects |
| 928 serialization_data = ( |
| 929 [GetDownloadSerializationData(obj_metadata)] * self.num_objects) |
| 930 |
| 931 # Warmup the TCP connection. |
| 932 warmup_obj_name = os.path.basename(self.tcp_warmup_file) |
| 933 self.temporary_objects.add(warmup_obj_name) |
| 934 self.Upload(self.tcp_warmup_file, warmup_obj_name) |
| 935 self.Download(warmup_obj_name) |
| 936 |
| 937 t0 = time.time() |
583 if self.processes == 1 and self.threads == 1: | 938 if self.processes == 1 and self.threads == 1: |
584 | 939 for i in range(self.num_objects): |
585 # Warm up the TCP connection. | 940 file_name = file_names[i] if use_file else None |
586 def _Warmup(): | 941 self.Download(object_names[i], file_name, serialization_data[i]) |
587 self.gsutil_api.GetObjectMedia(warmup_url.bucket_name, | |
588 warmup_url.object_name, | |
589 self.discard_sink, | |
590 provider=self.provider) | |
591 self._RunOperation(_Warmup) | |
592 | |
593 times = [] | |
594 | |
595 def _Download(): | |
596 t0 = time.time() | |
597 self.gsutil_api.GetObjectMedia( | |
598 thru_url.bucket_name, thru_url.object_name, self.discard_sink, | |
599 provider=self.provider, serialization_data=serialization_data) | |
600 t1 = time.time() | |
601 times.append(t1 - t0) | |
602 for _ in range(self.num_iterations): | |
603 self._RunOperation(_Download) | |
604 time_took = sum(times) | |
605 else: | 942 else: |
606 args = ([(thru_url.bucket_name, thru_url.object_name, serialization_data)] | 943 if self.parallel_strategy in (self.FAN, self.BOTH): |
607 * self.num_iterations) | 944 need_to_slice = (self.parallel_strategy == self.BOTH) |
608 self.logger.addFilter(self._CpFilter()) | 945 self.PerformFannedDownload(need_to_slice, object_names, file_names, |
609 | 946 serialization_data) |
610 t0 = time.time() | 947 elif self.parallel_strategy == self.SLICE: |
611 self.Apply(_DownloadWrapper, | 948 for i in range(self.num_objects): |
612 args, | 949 file_name = file_names[i] if use_file else None |
613 _PerfdiagExceptionHandler, | 950 self.PerformSlicedDownload( |
614 arg_checker=DummyArgChecker, | 951 object_names[i], file_name, serialization_data[i]) |
615 parallel_operations_override=True, | 952 t1 = time.time() |
616 process_count=self.processes, | 953 |
617 thread_count=self.threads) | 954 time_took = t1 - t0 |
618 t1 = time.time() | 955 total_bytes_copied = self.thru_filesize * self.num_objects |
619 time_took = t1 - t0 | |
620 | |
621 total_bytes_copied = self.thru_filesize * self.num_iterations | |
622 bytes_per_second = total_bytes_copied / time_took | 956 bytes_per_second = total_bytes_copied / time_took |
623 | 957 |
624 self.results['read_throughput']['time_took'] = time_took | 958 self.results[test_name]['time_took'] = time_took |
625 self.results['read_throughput']['total_bytes_copied'] = total_bytes_copied | 959 self.results[test_name]['total_bytes_copied'] = total_bytes_copied |
626 self.results['read_throughput']['bytes_per_second'] = bytes_per_second | 960 self.results[test_name]['bytes_per_second'] = bytes_per_second |
627 | 961 |
628 def _RunWriteThruTests(self): | 962 def _RunWriteThruTests(self, use_file=False): |
629 """Runs write throughput tests.""" | 963 """Runs write throughput tests.""" |
| 964 test_name = 'write_throughput_file' if use_file else 'write_throughput' |
| 965 file_io_string = 'with file I/O' if use_file else '' |
630 self.logger.info( | 966 self.logger.info( |
631 '\nRunning write throughput tests (%s iterations of size %s)' % | 967 '\nRunning write throughput tests %s (%s objects of size %s)' % |
632 (self.num_iterations, MakeHumanReadable(self.thru_filesize))) | 968 (file_io_string, self.num_objects, |
633 | 969 MakeHumanReadable(self.thru_filesize))) |
634 self.results['write_throughput'] = {'file_size': self.thru_filesize, | 970 self._WarnIfLargeData() |
635 'num_copies': self.num_iterations, | 971 |
636 'processes': self.processes, | 972 self.results[test_name] = {'file_size': self.thru_filesize, |
637 'threads': self.threads} | 973 'processes': self.processes, |
638 | 974 'threads': self.threads, |
639 warmup_url = self.bucket_url.Clone() | 975 'parallelism': self.parallel_strategy} |
640 warmup_url.object_name = os.path.basename(self.tcp_warmup_file) | 976 |
641 warmup_target = StorageUrlToUploadObjectMetadata(warmup_url) | 977 # Warmup the TCP connection. |
642 self.test_object_names.add(warmup_url.object_name) | 978 warmup_obj_name = os.path.basename(self.tcp_warmup_file) |
643 | 979 self.temporary_objects.add(warmup_obj_name) |
644 thru_url = self.bucket_url.Clone() | 980 self.Upload(self.tcp_warmup_file, warmup_obj_name) |
645 thru_url.object_name = os.path.basename(self.thru_local_file) | 981 |
646 thru_target = StorageUrlToUploadObjectMetadata(thru_url) | 982 if use_file: |
647 thru_tuples = [] | 983 # For test with file I/O use N files on disk to preserve seek performance. |
648 for i in xrange(self.num_iterations): | 984 file_names = self.thru_file_names |
649 # Create a unique name for each uploaded object. Otherwise, | 985 object_names = self.thru_object_names |
650 # the XML API would fail when trying to non-atomically get metadata | 986 else: |
651 # for the object that gets blown away by the overwrite. | 987 # For in-memory test only use one file but copy it num_objects times, to |
652 remote_object_name = thru_target.name + str(i) | 988 # allow for scalability in num_objects. |
653 self.test_object_names.add(remote_object_name) | 989 file_names = [self.mem_thru_file_name] * self.num_objects |
654 thru_tuples.append(UploadObjectTuple(thru_target.bucket, | 990 object_names = ( |
655 remote_object_name, | 991 [self.mem_thru_object_name + str(i) for i in range(self.num_objects)]) |
656 filepath=self.thru_local_file)) | 992 |
657 | 993 for object_name in object_names: |
| 994 self.temporary_objects.add(object_name) |
| 995 |
| 996 t0 = time.time() |
658 if self.processes == 1 and self.threads == 1: | 997 if self.processes == 1 and self.threads == 1: |
659 # Warm up the TCP connection. | 998 for i in range(self.num_objects): |
660 def _Warmup(): | 999 self.Upload(file_names[i], object_names[i], use_file) |
661 self.gsutil_api.UploadObject( | |
662 cStringIO.StringIO(self.file_contents[self.tcp_warmup_file]), | |
663 warmup_target, provider=self.provider, size=self.thru_filesize, | |
664 fields=['name']) | |
665 self._RunOperation(_Warmup) | |
666 | |
667 times = [] | |
668 | |
669 for i in xrange(self.num_iterations): | |
670 thru_tuple = thru_tuples[i] | |
671 def _Upload(): | |
672 """Uploads the write throughput measurement object.""" | |
673 upload_target = apitools_messages.Object( | |
674 bucket=thru_tuple.bucket_name, name=thru_tuple.object_name, | |
675 md5Hash=thru_tuple.md5) | |
676 io_fp = cStringIO.StringIO(self.file_contents[self.thru_local_file]) | |
677 t0 = time.time() | |
678 if self.thru_filesize < ResumableThreshold(): | |
679 self.gsutil_api.UploadObject( | |
680 io_fp, upload_target, provider=self.provider, | |
681 size=self.thru_filesize, fields=['name']) | |
682 else: | |
683 self.gsutil_api.UploadObjectResumable( | |
684 io_fp, upload_target, provider=self.provider, | |
685 size=self.thru_filesize, fields=['name'], | |
686 tracker_callback=_DummyTrackerCallback) | |
687 | |
688 t1 = time.time() | |
689 times.append(t1 - t0) | |
690 | |
691 self._RunOperation(_Upload) | |
692 time_took = sum(times) | |
693 | |
694 else: | 1000 else: |
695 args = thru_tuples | 1001 if self.parallel_strategy in (self.FAN, self.BOTH): |
696 t0 = time.time() | 1002 need_to_slice = (self.parallel_strategy == self.BOTH) |
697 self.Apply(_UploadWrapper, | 1003 self.PerformFannedUpload(need_to_slice, file_names, object_names, |
698 args, | 1004 use_file) |
699 _PerfdiagExceptionHandler, | 1005 elif self.parallel_strategy == self.SLICE: |
700 arg_checker=DummyArgChecker, | 1006 for i in range(self.num_objects): |
701 parallel_operations_override=True, | 1007 self.PerformSlicedUpload(file_names[i], object_names[i], use_file) |
702 process_count=self.processes, | 1008 t1 = time.time() |
703 thread_count=self.threads) | 1009 |
704 t1 = time.time() | 1010 time_took = t1 - t0 |
705 time_took = t1 - t0 | 1011 total_bytes_copied = self.thru_filesize * self.num_objects |
706 | |
707 total_bytes_copied = self.thru_filesize * self.num_iterations | |
708 bytes_per_second = total_bytes_copied / time_took | 1012 bytes_per_second = total_bytes_copied / time_took |
709 | 1013 |
710 self.results['write_throughput']['time_took'] = time_took | 1014 self.results[test_name]['time_took'] = time_took |
711 self.results['write_throughput']['total_bytes_copied'] = total_bytes_copied | 1015 self.results[test_name]['total_bytes_copied'] = total_bytes_copied |
712 self.results['write_throughput']['bytes_per_second'] = bytes_per_second | 1016 self.results[test_name]['bytes_per_second'] = bytes_per_second |
713 | 1017 |
714 def _RunListTests(self): | 1018 def _RunListTests(self): |
715 """Runs eventual consistency listing latency tests.""" | 1019 """Runs eventual consistency listing latency tests.""" |
716 self.results['listing'] = {'num_files': self.num_iterations} | 1020 self.results['listing'] = {'num_files': self.num_objects} |
717 | 1021 |
718 # Generate N random object names to put in the bucket. | 1022 # Generate N random objects to put into the bucket. |
719 list_prefix = 'gsutil-perfdiag-list-' | 1023 list_prefix = 'gsutil-perfdiag-list-' |
| 1024 list_fpaths = [] |
720 list_objects = [] | 1025 list_objects = [] |
721 for _ in xrange(self.num_iterations): | 1026 args = [] |
722 list_object_name = u'%s%s' % (list_prefix, os.urandom(20).encode('hex')) | 1027 for _ in xrange(self.num_objects): |
723 self.test_object_names.add(list_object_name) | 1028 fpath = self._MakeTempFile(0, mem_data=True, mem_metadata=True, |
724 list_objects.append(list_object_name) | 1029 prefix=list_prefix) |
| 1030 list_fpaths.append(fpath) |
| 1031 object_name = os.path.basename(fpath) |
| 1032 list_objects.append(object_name) |
| 1033 args.append(FanUploadTuple(False, fpath, object_name, False)) |
| 1034 self.temporary_objects.add(object_name) |
725 | 1035 |
726 # Add the objects to the bucket. | 1036 # Add the objects to the bucket. |
727 self.logger.info( | 1037 self.logger.info( |
728 '\nWriting %s objects for listing test...', self.num_iterations) | 1038 '\nWriting %s objects for listing test...', self.num_objects) |
729 empty_md5 = CalculateB64EncodedMd5FromContents(cStringIO.StringIO('')) | 1039 |
730 args = [ | 1040 self.Apply(_UploadObject, args, _PerfdiagExceptionHandler, |
731 UploadObjectTuple(self.bucket_url.bucket_name, name, md5=empty_md5, | |
732 contents='') for name in list_objects] | |
733 self.Apply(_UploadWrapper, args, _PerfdiagExceptionHandler, | |
734 arg_checker=DummyArgChecker) | 1041 arg_checker=DummyArgChecker) |
735 | 1042 |
736 list_latencies = [] | 1043 list_latencies = [] |
737 files_seen = [] | 1044 files_seen = [] |
738 total_start_time = time.time() | 1045 total_start_time = time.time() |
739 expected_objects = set(list_objects) | 1046 expected_objects = set(list_objects) |
740 found_objects = set() | 1047 found_objects = set() |
741 | 1048 |
742 def _List(): | 1049 def _List(): |
743 """Lists and returns objects in the bucket. Also records latency.""" | 1050 """Lists and returns objects in the bucket. Also records latency.""" |
744 t0 = time.time() | 1051 t0 = time.time() |
745 objects = list(self.gsutil_api.ListObjects( | 1052 objects = list(self.gsutil_api.ListObjects( |
746 self.bucket_url.bucket_name, prefix=list_prefix, delimiter='/', | 1053 self.bucket_url.bucket_name, delimiter='/', |
747 provider=self.provider, fields=['items/name'])) | 1054 provider=self.provider, fields=['items/name'])) |
748 t1 = time.time() | 1055 t1 = time.time() |
749 list_latencies.append(t1 - t0) | 1056 list_latencies.append(t1 - t0) |
750 return set([obj.data.name for obj in objects]) | 1057 return set([obj.data.name for obj in objects]) |
751 | 1058 |
752 self.logger.info( | 1059 self.logger.info( |
753 'Listing bucket %s waiting for %s objects to appear...', | 1060 'Listing bucket %s waiting for %s objects to appear...', |
754 self.bucket_url.bucket_name, self.num_iterations) | 1061 self.bucket_url.bucket_name, self.num_objects) |
755 while expected_objects - found_objects: | 1062 while expected_objects - found_objects: |
756 def _ListAfterUpload(): | 1063 def _ListAfterUpload(): |
757 names = _List() | 1064 names = _List() |
758 found_objects.update(names & expected_objects) | 1065 found_objects.update(names & expected_objects) |
759 files_seen.append(len(found_objects)) | 1066 files_seen.append(len(found_objects)) |
760 self._RunOperation(_ListAfterUpload) | 1067 self._RunOperation(_ListAfterUpload) |
761 if expected_objects - found_objects: | 1068 if expected_objects - found_objects: |
762 if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME: | 1069 if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME: |
763 self.logger.warning('Maximum time reached waiting for listing.') | 1070 self.logger.warning('Maximum time reached waiting for listing.') |
764 break | 1071 break |
765 total_end_time = time.time() | 1072 total_end_time = time.time() |
766 | 1073 |
767 self.results['listing']['insert'] = { | 1074 self.results['listing']['insert'] = { |
768 'num_listing_calls': len(list_latencies), | 1075 'num_listing_calls': len(list_latencies), |
769 'list_latencies': list_latencies, | 1076 'list_latencies': list_latencies, |
770 'files_seen_after_listing': files_seen, | 1077 'files_seen_after_listing': files_seen, |
771 'time_took': total_end_time - total_start_time, | 1078 'time_took': total_end_time - total_start_time, |
772 } | 1079 } |
773 | 1080 |
| 1081 args = [object_name for object_name in list_objects] |
774 self.logger.info( | 1082 self.logger.info( |
775 'Deleting %s objects for listing test...', self.num_iterations) | 1083 'Deleting %s objects for listing test...', self.num_objects) |
776 self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler, | 1084 self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler, |
777 arg_checker=DummyArgChecker) | 1085 arg_checker=DummyArgChecker) |
778 | 1086 |
779 self.logger.info( | 1087 self.logger.info( |
780 'Listing bucket %s waiting for %s objects to disappear...', | 1088 'Listing bucket %s waiting for %s objects to disappear...', |
781 self.bucket_url.bucket_name, self.num_iterations) | 1089 self.bucket_url.bucket_name, self.num_objects) |
782 list_latencies = [] | 1090 list_latencies = [] |
783 files_seen = [] | 1091 files_seen = [] |
784 total_start_time = time.time() | 1092 total_start_time = time.time() |
785 found_objects = set(list_objects) | 1093 found_objects = set(list_objects) |
786 while found_objects: | 1094 while found_objects: |
787 def _ListAfterDelete(): | 1095 def _ListAfterDelete(): |
788 names = _List() | 1096 names = _List() |
789 found_objects.intersection_update(names) | 1097 found_objects.intersection_update(names) |
790 files_seen.append(len(found_objects)) | 1098 files_seen.append(len(found_objects)) |
791 self._RunOperation(_ListAfterDelete) | 1099 self._RunOperation(_ListAfterDelete) |
792 if found_objects: | 1100 if found_objects: |
793 if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME: | 1101 if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME: |
794 self.logger.warning('Maximum time reached waiting for listing.') | 1102 self.logger.warning('Maximum time reached waiting for listing.') |
795 break | 1103 break |
796 total_end_time = time.time() | 1104 total_end_time = time.time() |
797 | 1105 |
798 self.results['listing']['delete'] = { | 1106 self.results['listing']['delete'] = { |
799 'num_listing_calls': len(list_latencies), | 1107 'num_listing_calls': len(list_latencies), |
800 'list_latencies': list_latencies, | 1108 'list_latencies': list_latencies, |
801 'files_seen_after_listing': files_seen, | 1109 'files_seen_after_listing': files_seen, |
802 'time_took': total_end_time - total_start_time, | 1110 'time_took': total_end_time - total_start_time, |
803 } | 1111 } |
804 | 1112 |
805 def Upload(self, thru_tuple, thread_state=None): | 1113 def Upload(self, file_name, object_name, use_file=False, file_start=0, |
806 gsutil_api = GetCloudApiInstance(self, thread_state) | 1114 file_size=None): |
| 1115 """Performs an upload to the test bucket. |
807 | 1116 |
808 md5hash = thru_tuple.md5 | 1117 The file is uploaded to the bucket referred to by self.bucket_url, and has |
809 contents = thru_tuple.contents | 1118 name object_name. |
810 if thru_tuple.filepath: | |
811 md5hash = self.file_md5s[thru_tuple.filepath] | |
812 contents = self.file_contents[thru_tuple.filepath] | |
813 | |
814 upload_target = apitools_messages.Object( | |
815 bucket=thru_tuple.bucket_name, name=thru_tuple.object_name, | |
816 md5Hash=md5hash) | |
817 file_size = len(contents) | |
818 if file_size < ResumableThreshold(): | |
819 gsutil_api.UploadObject( | |
820 cStringIO.StringIO(contents), upload_target, | |
821 provider=self.provider, size=file_size, fields=['name']) | |
822 else: | |
823 gsutil_api.UploadObjectResumable( | |
824 cStringIO.StringIO(contents), upload_target, | |
825 provider=self.provider, size=file_size, fields=['name'], | |
826 tracker_callback=_DummyTrackerCallback) | |
827 | |
828 def Download(self, download_tuple, thread_state=None): | |
829 """Downloads a file. | |
830 | 1119 |
831 Args: | 1120 Args: |
832 download_tuple: (bucket name, object name, serialization data for object). | 1121 file_name: The path to the local file, and the key to its entry in |
833 thread_state: gsutil Cloud API instance to use for the download. | 1122 temp_file_dict. |
| 1123 object_name: The name of the remote object. |
| 1124 use_file: If true, use disk I/O, otherwise read everything from memory. |
| 1125 file_start: The first byte in the file to upload to the object. |
| 1126 (only should be specified for sliced uploads) |
| 1127 file_size: The size of the file to upload. |
| 1128 (only should be specified for sliced uploads) |
| 1129 |
| 1130 Returns: |
| 1131 Uploaded Object Metadata. |
834 """ | 1132 """ |
835 gsutil_api = GetCloudApiInstance(self, thread_state) | 1133 fp = None |
836 gsutil_api.GetObjectMedia( | 1134 if file_size is None: |
837 download_tuple[0], download_tuple[1], self.discard_sink, | 1135 file_size = temp_file_dict[file_name].size |
838 provider=self.provider, serialization_data=download_tuple[2]) | |
839 | 1136 |
840 def Delete(self, thru_tuple, thread_state=None): | 1137 upload_url = self.bucket_url.Clone() |
841 gsutil_api = thread_state or self.gsutil_api | 1138 upload_url.object_name = object_name |
842 gsutil_api.DeleteObject( | 1139 upload_target = StorageUrlToUploadObjectMetadata(upload_url) |
843 thru_tuple.bucket_name, thru_tuple.object_name, provider=self.provider) | 1140 |
| 1141 try: |
| 1142 if use_file: |
| 1143 fp = FilePart(file_name, file_start, file_size) |
| 1144 else: |
| 1145 data = temp_file_dict[file_name].data[file_start:file_start+file_size] |
| 1146 fp = cStringIO.StringIO(data) |
| 1147 |
| 1148 def _InnerUpload(): |
| 1149 if file_size < ResumableThreshold(): |
| 1150 return self.gsutil_api.UploadObject( |
| 1151 fp, upload_target, provider=self.provider, size=file_size, |
| 1152 fields=['name', 'mediaLink', 'size']) |
| 1153 else: |
| 1154 return self.gsutil_api.UploadObjectResumable( |
| 1155 fp, upload_target, provider=self.provider, size=file_size, |
| 1156 fields=['name', 'mediaLink', 'size'], |
| 1157 tracker_callback=_DummyTrackerCallback) |
| 1158 return self._RunOperation(_InnerUpload) |
| 1159 finally: |
| 1160 if fp: |
| 1161 fp.close() |
| 1162 |
| 1163 def Download(self, object_name, file_name=None, serialization_data=None, |
| 1164 start_byte=0, end_byte=None): |
| 1165 """Downloads an object from the test bucket. |
| 1166 |
| 1167 Args: |
| 1168 object_name: The name of the object (in the test bucket) to download. |
| 1169 file_name: Optional file name to write downloaded data to. If None, |
| 1170 downloaded data is discarded immediately. |
| 1171 serialization_data: Optional serialization data, used so that we don't |
| 1172 have to get the metadata before downloading. |
| 1173 start_byte: The first byte in the object to download. |
| 1174 (only should be specified for sliced downloads) |
| 1175 end_byte: The last byte in the object to download. |
| 1176 (only should be specified for sliced downloads) |
| 1177 """ |
| 1178 fp = None |
| 1179 try: |
| 1180 if file_name is not None: |
| 1181 fp = open(file_name, 'r+b') |
| 1182 fp.seek(start_byte) |
| 1183 else: |
| 1184 fp = self.discard_sink |
| 1185 |
| 1186 def _InnerDownload(): |
| 1187 self.gsutil_api.GetObjectMedia( |
| 1188 self.bucket_url.bucket_name, object_name, fp, |
| 1189 provider=self.provider, start_byte=start_byte, end_byte=end_byte, |
| 1190 serialization_data=serialization_data) |
| 1191 self._RunOperation(_InnerDownload) |
| 1192 finally: |
| 1193 if fp: |
| 1194 fp.close() |
| 1195 |
| 1196 def Delete(self, object_name): |
| 1197 """Deletes an object from the test bucket. |
| 1198 |
| 1199 Args: |
| 1200 object_name: The name of the object to delete. |
| 1201 """ |
| 1202 try: |
| 1203 def _InnerDelete(): |
| 1204 self.gsutil_api.DeleteObject(self.bucket_url.bucket_name, object_name, |
| 1205 provider=self.provider) |
| 1206 self._RunOperation(_InnerDelete) |
| 1207 except NotFoundException: |
| 1208 pass |
844 | 1209 |
845 def _GetDiskCounters(self): | 1210 def _GetDiskCounters(self): |
846 """Retrieves disk I/O statistics for all disks. | 1211 """Retrieves disk I/O statistics for all disks. |
847 | 1212 |
848 Adapted from the psutil module's psutil._pslinux.disk_io_counters: | 1213 Adapted from the psutil module's psutil._pslinux.disk_io_counters: |
849 http://code.google.com/p/psutil/source/browse/trunk/psutil/_pslinux.py | 1214 http://code.google.com/p/psutil/source/browse/trunk/psutil/_pslinux.py |
850 | 1215 |
851 Originally distributed under under a BSD license. | 1216 Originally distributed under under a BSD license. |
852 Original Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola. | 1217 Original Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola. |
853 | 1218 |
(...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
959 self.logger.info('DNS lookups are disallowed in this environment, so ' | 1324 self.logger.info('DNS lookups are disallowed in this environment, so ' |
960 'some information is not included in this perfdiag run.') | 1325 'some information is not included in this perfdiag run.') |
961 | 1326 |
962 # Get the local IP address from socket lib. | 1327 # Get the local IP address from socket lib. |
963 try: | 1328 try: |
964 sysinfo['ip_address'] = socket.gethostbyname(socket.gethostname()) | 1329 sysinfo['ip_address'] = socket.gethostbyname(socket.gethostname()) |
965 except socket_errors: | 1330 except socket_errors: |
966 sysinfo['ip_address'] = '' | 1331 sysinfo['ip_address'] = '' |
967 # Record the temporary directory used since it can affect performance, e.g. | 1332 # Record the temporary directory used since it can affect performance, e.g. |
968 # when on a networked filesystem. | 1333 # when on a networked filesystem. |
969 sysinfo['tempdir'] = tempfile.gettempdir() | 1334 sysinfo['tempdir'] = self.directory |
970 | 1335 |
971 # Produces an RFC 2822 compliant GMT timestamp. | 1336 # Produces an RFC 2822 compliant GMT timestamp. |
972 sysinfo['gmt_timestamp'] = time.strftime('%a, %d %b %Y %H:%M:%S +0000', | 1337 sysinfo['gmt_timestamp'] = time.strftime('%a, %d %b %Y %H:%M:%S +0000', |
973 time.gmtime()) | 1338 time.gmtime()) |
974 | 1339 |
975 # Execute a CNAME lookup on Google DNS to find what Google server | 1340 # Execute a CNAME lookup on Google DNS to find what Google server |
976 # it's routing to. | 1341 # it's routing to. |
977 cmd = ['nslookup', '-type=CNAME', self.XML_API_HOST] | 1342 cmd = ['nslookup', '-type=CNAME', self.XML_API_HOST] |
978 try: | 1343 try: |
979 nslookup_cname_output = self._Exec(cmd, return_output=True) | 1344 nslookup_cname_output = self._Exec(cmd, return_output=True) |
(...skipping 191 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1171 print 'Delete'.rjust(9), '', | 1536 print 'Delete'.rjust(9), '', |
1172 print MakeHumanReadable(numbytes).rjust(9), '', | 1537 print MakeHumanReadable(numbytes).rjust(9), '', |
1173 self._DisplayStats(trials) | 1538 self._DisplayStats(trials) |
1174 | 1539 |
1175 if 'write_throughput' in self.results: | 1540 if 'write_throughput' in self.results: |
1176 print | 1541 print |
1177 print '-' * 78 | 1542 print '-' * 78 |
1178 print 'Write Throughput'.center(78) | 1543 print 'Write Throughput'.center(78) |
1179 print '-' * 78 | 1544 print '-' * 78 |
1180 write_thru = self.results['write_throughput'] | 1545 write_thru = self.results['write_throughput'] |
1181 print 'Copied a %s file %d times for a total transfer size of %s.' % ( | 1546 print 'Copied %s %s file(s) for a total transfer size of %s.' % ( |
| 1547 self.num_objects, |
1182 MakeHumanReadable(write_thru['file_size']), | 1548 MakeHumanReadable(write_thru['file_size']), |
1183 write_thru['num_copies'], | |
1184 MakeHumanReadable(write_thru['total_bytes_copied'])) | 1549 MakeHumanReadable(write_thru['total_bytes_copied'])) |
1185 print 'Write throughput: %s/s.' % ( | 1550 print 'Write throughput: %s/s.' % ( |
1186 MakeBitsHumanReadable(write_thru['bytes_per_second'] * 8)) | 1551 MakeBitsHumanReadable(write_thru['bytes_per_second'] * 8)) |
| 1552 print 'Parallelism strategy: %s' % write_thru['parallelism'] |
| 1553 |
| 1554 if 'write_throughput_file' in self.results: |
| 1555 print |
| 1556 print '-' * 78 |
| 1557 print 'Write Throughput With File I/O'.center(78) |
| 1558 print '-' * 78 |
| 1559 write_thru_file = self.results['write_throughput_file'] |
| 1560 print 'Copied %s %s file(s) for a total transfer size of %s.' % ( |
| 1561 self.num_objects, |
| 1562 MakeHumanReadable(write_thru_file['file_size']), |
| 1563 MakeHumanReadable(write_thru_file['total_bytes_copied'])) |
| 1564 print 'Write throughput: %s/s.' % ( |
| 1565 MakeBitsHumanReadable(write_thru_file['bytes_per_second'] * 8)) |
| 1566 print 'Parallelism strategy: %s' % write_thru_file['parallelism'] |
1187 | 1567 |
1188 if 'read_throughput' in self.results: | 1568 if 'read_throughput' in self.results: |
1189 print | 1569 print |
1190 print '-' * 78 | 1570 print '-' * 78 |
1191 print 'Read Throughput'.center(78) | 1571 print 'Read Throughput'.center(78) |
1192 print '-' * 78 | 1572 print '-' * 78 |
1193 read_thru = self.results['read_throughput'] | 1573 read_thru = self.results['read_throughput'] |
1194 print 'Copied a %s file %d times for a total transfer size of %s.' % ( | 1574 print 'Copied %s %s file(s) for a total transfer size of %s.' % ( |
| 1575 self.num_objects, |
1195 MakeHumanReadable(read_thru['file_size']), | 1576 MakeHumanReadable(read_thru['file_size']), |
1196 read_thru['num_times'], | |
1197 MakeHumanReadable(read_thru['total_bytes_copied'])) | 1577 MakeHumanReadable(read_thru['total_bytes_copied'])) |
1198 print 'Read throughput: %s/s.' % ( | 1578 print 'Read throughput: %s/s.' % ( |
1199 MakeBitsHumanReadable(read_thru['bytes_per_second'] * 8)) | 1579 MakeBitsHumanReadable(read_thru['bytes_per_second'] * 8)) |
| 1580 print 'Parallelism strategy: %s' % read_thru['parallelism'] |
| 1581 |
| 1582 if 'read_throughput_file' in self.results: |
| 1583 print |
| 1584 print '-' * 78 |
| 1585 print 'Read Throughput With File I/O'.center(78) |
| 1586 print '-' * 78 |
| 1587 read_thru_file = self.results['read_throughput_file'] |
| 1588 print 'Copied %s %s file(s) for a total transfer size of %s.' % ( |
| 1589 self.num_objects, |
| 1590 MakeHumanReadable(read_thru_file['file_size']), |
| 1591 MakeHumanReadable(read_thru_file['total_bytes_copied'])) |
| 1592 print 'Read throughput: %s/s.' % ( |
| 1593 MakeBitsHumanReadable(read_thru_file['bytes_per_second'] * 8)) |
| 1594 print 'Parallelism strategy: %s' % read_thru_file['parallelism'] |
1200 | 1595 |
1201 if 'listing' in self.results: | 1596 if 'listing' in self.results: |
1202 print | 1597 print |
1203 print '-' * 78 | 1598 print '-' * 78 |
1204 print 'Listing'.center(78) | 1599 print 'Listing'.center(78) |
1205 print '-' * 78 | 1600 print '-' * 78 |
1206 | 1601 |
1207 listing = self.results['listing'] | 1602 listing = self.results['listing'] |
1208 insert = listing['insert'] | 1603 insert = listing['insert'] |
1209 delete = listing['delete'] | 1604 delete = listing['delete'] |
(...skipping 172 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1382 val = int(val) | 1777 val = int(val) |
1383 if val < 1: | 1778 if val < 1: |
1384 raise CommandException(msg) | 1779 raise CommandException(msg) |
1385 return val | 1780 return val |
1386 except ValueError: | 1781 except ValueError: |
1387 raise CommandException(msg) | 1782 raise CommandException(msg) |
1388 | 1783 |
1389 def _ParseArgs(self): | 1784 def _ParseArgs(self): |
1390 """Parses arguments for perfdiag command.""" | 1785 """Parses arguments for perfdiag command.""" |
1391 # From -n. | 1786 # From -n. |
1392 self.num_iterations = 5 | 1787 self.num_objects = 5 |
1393 # From -c. | 1788 # From -c. |
1394 self.processes = 1 | 1789 self.processes = 1 |
1395 # From -k. | 1790 # From -k. |
1396 self.threads = 1 | 1791 self.threads = 1 |
| 1792 # From -p |
| 1793 self.parallel_strategy = None |
| 1794 # From -y |
| 1795 self.num_slices = 4 |
1397 # From -s. | 1796 # From -s. |
1398 self.thru_filesize = 1048576 | 1797 self.thru_filesize = 1048576 |
| 1798 # From -d. |
| 1799 self.directory = tempfile.gettempdir() |
| 1800 # Keep track of whether or not to delete the directory upon completion. |
| 1801 self.delete_directory = False |
1399 # From -t. | 1802 # From -t. |
1400 self.diag_tests = self.DEFAULT_DIAG_TESTS | 1803 self.diag_tests = set(self.DEFAULT_DIAG_TESTS) |
1401 # From -o. | 1804 # From -o. |
1402 self.output_file = None | 1805 self.output_file = None |
1403 # From -i. | 1806 # From -i. |
1404 self.input_file = None | 1807 self.input_file = None |
1405 # From -m. | 1808 # From -m. |
1406 self.metadata_keys = {} | 1809 self.metadata_keys = {} |
1407 | 1810 |
1408 if self.sub_opts: | 1811 if self.sub_opts: |
1409 for o, a in self.sub_opts: | 1812 for o, a in self.sub_opts: |
1410 if o == '-n': | 1813 if o == '-n': |
1411 self.num_iterations = self._ParsePositiveInteger( | 1814 self.num_objects = self._ParsePositiveInteger( |
1412 a, 'The -n parameter must be a positive integer.') | 1815 a, 'The -n parameter must be a positive integer.') |
1413 if o == '-c': | 1816 if o == '-c': |
1414 self.processes = self._ParsePositiveInteger( | 1817 self.processes = self._ParsePositiveInteger( |
1415 a, 'The -c parameter must be a positive integer.') | 1818 a, 'The -c parameter must be a positive integer.') |
1416 if o == '-k': | 1819 if o == '-k': |
1417 self.threads = self._ParsePositiveInteger( | 1820 self.threads = self._ParsePositiveInteger( |
1418 a, 'The -k parameter must be a positive integer.') | 1821 a, 'The -k parameter must be a positive integer.') |
| 1822 if o == '-p': |
| 1823 if a.lower() in self.PARALLEL_STRATEGIES: |
| 1824 self.parallel_strategy = a.lower() |
| 1825 else: |
| 1826 raise CommandException( |
| 1827 "'%s' is not a valid parallelism strategy." % a) |
| 1828 if o == '-y': |
| 1829 self.num_slices = self._ParsePositiveInteger( |
| 1830 a, 'The -y parameter must be a positive integer.') |
1419 if o == '-s': | 1831 if o == '-s': |
1420 try: | 1832 try: |
1421 self.thru_filesize = HumanReadableToBytes(a) | 1833 self.thru_filesize = HumanReadableToBytes(a) |
1422 except ValueError: | 1834 except ValueError: |
1423 raise CommandException('Invalid -s parameter.') | 1835 raise CommandException('Invalid -s parameter.') |
1424 if self.thru_filesize > (20 * 1024 ** 3): # Max 20 GiB. | 1836 if o == '-d': |
1425 raise CommandException( | 1837 self.directory = a |
1426 'Maximum throughput file size parameter (-s) is 20 GiB.') | 1838 if not os.path.exists(self.directory): |
| 1839 self.delete_directory = True |
| 1840 os.makedirs(self.directory) |
1427 if o == '-t': | 1841 if o == '-t': |
1428 self.diag_tests = [] | 1842 self.diag_tests = set() |
1429 for test_name in a.strip().split(','): | 1843 for test_name in a.strip().split(','): |
1430 if test_name.lower() not in self.ALL_DIAG_TESTS: | 1844 if test_name.lower() not in self.ALL_DIAG_TESTS: |
1431 raise CommandException("List of test names (-t) contains invalid " | 1845 raise CommandException("List of test names (-t) contains invalid " |
1432 "test name '%s'." % test_name) | 1846 "test name '%s'." % test_name) |
1433 self.diag_tests.append(test_name) | 1847 self.diag_tests.add(test_name) |
1434 if o == '-m': | 1848 if o == '-m': |
1435 pieces = a.split(':') | 1849 pieces = a.split(':') |
1436 if len(pieces) != 2: | 1850 if len(pieces) != 2: |
1437 raise CommandException( | 1851 raise CommandException( |
1438 "Invalid metadata key-value combination '%s'." % a) | 1852 "Invalid metadata key-value combination '%s'." % a) |
1439 key, value = pieces | 1853 key, value = pieces |
1440 self.metadata_keys[key] = value | 1854 self.metadata_keys[key] = value |
1441 if o == '-o': | 1855 if o == '-o': |
1442 self.output_file = os.path.abspath(a) | 1856 self.output_file = os.path.abspath(a) |
1443 if o == '-i': | 1857 if o == '-i': |
1444 self.input_file = os.path.abspath(a) | 1858 self.input_file = os.path.abspath(a) |
1445 if not os.path.isfile(self.input_file): | 1859 if not os.path.isfile(self.input_file): |
1446 raise CommandException("Invalid input file (-i): '%s'." % a) | 1860 raise CommandException("Invalid input file (-i): '%s'." % a) |
1447 try: | 1861 try: |
1448 with open(self.input_file, 'r') as f: | 1862 with open(self.input_file, 'r') as f: |
1449 self.results = json.load(f) | 1863 self.results = json.load(f) |
1450 self.logger.info("Read input file: '%s'.", self.input_file) | 1864 self.logger.info("Read input file: '%s'.", self.input_file) |
1451 except ValueError: | 1865 except ValueError: |
1452 raise CommandException("Could not decode input file (-i): '%s'." % | 1866 raise CommandException("Could not decode input file (-i): '%s'." % |
1453 a) | 1867 a) |
1454 return | 1868 return |
| 1869 |
| 1870 # If parallelism is specified, default parallelism strategy to fan. |
| 1871 if (self.processes > 1 or self.threads > 1) and not self.parallel_strategy: |
| 1872 self.parallel_strategy = self.FAN |
| 1873 elif self.processes == 1 and self.threads == 1 and self.parallel_strategy: |
| 1874 raise CommandException( |
| 1875 'Cannot specify parallelism strategy (-p) without also specifying ' |
| 1876 'multiple threads and/or processes (-c and/or -k).') |
| 1877 |
1455 if not self.args: | 1878 if not self.args: |
1456 self.RaiseWrongNumberOfArgumentsException() | 1879 self.RaiseWrongNumberOfArgumentsException() |
1457 | 1880 |
1458 self.bucket_url = StorageUrlFromString(self.args[0]) | 1881 self.bucket_url = StorageUrlFromString(self.args[0]) |
1459 self.provider = self.bucket_url.scheme | 1882 self.provider = self.bucket_url.scheme |
1460 if not (self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket()): | 1883 if not self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket(): |
1461 raise CommandException('The perfdiag command requires a URL that ' | 1884 raise CommandException('The perfdiag command requires a URL that ' |
1462 'specifies a bucket.\n"%s" is not ' | 1885 'specifies a bucket.\n"%s" is not ' |
1463 'valid.' % self.args[0]) | 1886 'valid.' % self.args[0]) |
| 1887 |
| 1888 if (self.thru_filesize > HumanReadableToBytes('2GiB') and |
| 1889 (self.RTHRU in self.diag_tests or self.WTHRU in self.diag_tests)): |
| 1890 raise CommandException( |
| 1891 'For in-memory tests maximum file size is 2GiB. For larger file ' |
| 1892 'sizes, specify rthru_file and/or wthru_file with the -t option.') |
| 1893 |
| 1894 perform_slice = self.parallel_strategy in (self.SLICE, self.BOTH) |
| 1895 slice_not_available = ( |
| 1896 self.provider == 's3' and self.diag_tests.intersection(self.WTHRU, |
| 1897 self.WTHRU_FILE)) |
| 1898 if perform_slice and slice_not_available: |
| 1899 raise CommandException('Sliced uploads are not available for s3. ' |
| 1900 'Use -p fan or sequential uploads for s3.') |
| 1901 |
1464 # Ensure the bucket exists. | 1902 # Ensure the bucket exists. |
1465 self.gsutil_api.GetBucket(self.bucket_url.bucket_name, | 1903 self.gsutil_api.GetBucket(self.bucket_url.bucket_name, |
1466 provider=self.bucket_url.scheme, | 1904 provider=self.bucket_url.scheme, |
1467 fields=['id']) | 1905 fields=['id']) |
1468 self.exceptions = [httplib.HTTPException, socket.error, socket.gaierror, | 1906 self.exceptions = [httplib.HTTPException, socket.error, socket.gaierror, |
1469 socket.timeout, httplib.BadStatusLine, | 1907 socket.timeout, httplib.BadStatusLine, |
1470 ServiceException] | 1908 ServiceException] |
1471 | 1909 |
1472 # Command entry point. | 1910 # Command entry point. |
1473 def RunCommand(self): | 1911 def RunCommand(self): |
1474 """Called by gsutil when the command is being invoked.""" | 1912 """Called by gsutil when the command is being invoked.""" |
1475 self._ParseArgs() | 1913 self._ParseArgs() |
1476 | 1914 |
1477 if self.input_file: | 1915 if self.input_file: |
1478 self._DisplayResults() | 1916 self._DisplayResults() |
1479 return 0 | 1917 return 0 |
1480 | 1918 |
1481 # We turn off retries in the underlying boto library because the | 1919 # We turn off retries in the underlying boto library because the |
1482 # _RunOperation function handles errors manually so it can count them. | 1920 # _RunOperation function handles errors manually so it can count them. |
1483 boto.config.set('Boto', 'num_retries', '0') | 1921 boto.config.set('Boto', 'num_retries', '0') |
1484 | 1922 |
1485 self.logger.info( | 1923 self.logger.info( |
1486 'Number of iterations to run: %d\n' | 1924 'Number of iterations to run: %d\n' |
1487 'Base bucket URI: %s\n' | 1925 'Base bucket URI: %s\n' |
1488 'Number of processes: %d\n' | 1926 'Number of processes: %d\n' |
1489 'Number of threads: %d\n' | 1927 'Number of threads: %d\n' |
| 1928 'Parallelism strategy: %s\n' |
1490 'Throughput file size: %s\n' | 1929 'Throughput file size: %s\n' |
1491 'Diagnostics to run: %s', | 1930 'Diagnostics to run: %s', |
1492 self.num_iterations, | 1931 self.num_objects, |
1493 self.bucket_url, | 1932 self.bucket_url, |
1494 self.processes, | 1933 self.processes, |
1495 self.threads, | 1934 self.threads, |
| 1935 self.parallel_strategy, |
1496 MakeHumanReadable(self.thru_filesize), | 1936 MakeHumanReadable(self.thru_filesize), |
1497 (', '.join(self.diag_tests))) | 1937 (', '.join(self.diag_tests))) |
1498 | 1938 |
1499 try: | 1939 try: |
1500 self._SetUp() | 1940 self._SetUp() |
1501 | 1941 |
1502 # Collect generic system info. | 1942 # Collect generic system info. |
1503 self._CollectSysInfo() | 1943 self._CollectSysInfo() |
1504 # Collect netstat info and disk counters before tests (and again later). | 1944 # Collect netstat info and disk counters before tests (and again later). |
1505 netstat_output = self._GetTcpStats() | 1945 netstat_output = self._GetTcpStats() |
1506 if netstat_output: | 1946 if netstat_output: |
1507 self.results['sysinfo']['netstat_start'] = netstat_output | 1947 self.results['sysinfo']['netstat_start'] = netstat_output |
1508 if IS_LINUX: | 1948 if IS_LINUX: |
1509 self.results['sysinfo']['disk_counters_start'] = self._GetDiskCounters() | 1949 self.results['sysinfo']['disk_counters_start'] = self._GetDiskCounters() |
1510 # Record bucket URL. | 1950 # Record bucket URL. |
1511 self.results['bucket_uri'] = str(self.bucket_url) | 1951 self.results['bucket_uri'] = str(self.bucket_url) |
1512 self.results['json_format'] = 'perfdiag' | 1952 self.results['json_format'] = 'perfdiag' |
1513 self.results['metadata'] = self.metadata_keys | 1953 self.results['metadata'] = self.metadata_keys |
1514 | 1954 |
1515 if self.LAT in self.diag_tests: | 1955 if self.LAT in self.diag_tests: |
1516 self._RunLatencyTests() | 1956 self._RunLatencyTests() |
1517 if self.RTHRU in self.diag_tests: | 1957 if self.RTHRU in self.diag_tests: |
1518 self._RunReadThruTests() | 1958 self._RunReadThruTests() |
| 1959 # Run WTHRU_FILE before RTHRU_FILE. If data is created in WTHRU_FILE it |
| 1960 # will be used in RTHRU_FILE to save time and bandwidth. |
| 1961 if self.WTHRU_FILE in self.diag_tests: |
| 1962 self._RunWriteThruTests(use_file=True) |
| 1963 if self.RTHRU_FILE in self.diag_tests: |
| 1964 self._RunReadThruTests(use_file=True) |
1519 if self.WTHRU in self.diag_tests: | 1965 if self.WTHRU in self.diag_tests: |
1520 self._RunWriteThruTests() | 1966 self._RunWriteThruTests() |
1521 if self.LIST in self.diag_tests: | 1967 if self.LIST in self.diag_tests: |
1522 self._RunListTests() | 1968 self._RunListTests() |
1523 | 1969 |
1524 # Collect netstat info and disk counters after tests. | 1970 # Collect netstat info and disk counters after tests. |
1525 netstat_output = self._GetTcpStats() | 1971 netstat_output = self._GetTcpStats() |
1526 if netstat_output: | 1972 if netstat_output: |
1527 self.results['sysinfo']['netstat_end'] = netstat_output | 1973 self.results['sysinfo']['netstat_end'] = netstat_output |
1528 if IS_LINUX: | 1974 if IS_LINUX: |
1529 self.results['sysinfo']['disk_counters_end'] = self._GetDiskCounters() | 1975 self.results['sysinfo']['disk_counters_end'] = self._GetDiskCounters() |
1530 | 1976 |
1531 self.results['total_requests'] = self.total_requests | 1977 self.results['total_requests'] = self.total_requests |
1532 self.results['request_errors'] = self.request_errors | 1978 self.results['request_errors'] = self.request_errors |
1533 self.results['error_responses_by_code'] = self.error_responses_by_code | 1979 self.results['error_responses_by_code'] = self.error_responses_by_code |
1534 self.results['connection_breaks'] = self.connection_breaks | 1980 self.results['connection_breaks'] = self.connection_breaks |
1535 self.results['gsutil_version'] = gslib.VERSION | 1981 self.results['gsutil_version'] = gslib.VERSION |
1536 self.results['boto_version'] = boto.__version__ | 1982 self.results['boto_version'] = boto.__version__ |
1537 | 1983 |
| 1984 self._TearDown() |
1538 self._DisplayResults() | 1985 self._DisplayResults() |
1539 finally: | 1986 finally: |
1540 # TODO: Install signal handlers so this is performed in response to a | 1987 # TODO: Install signal handlers so this is performed in response to a |
1541 # terminating signal; consider multi-threaded object deletes during | 1988 # terminating signal; consider multi-threaded object deletes during |
1542 # cleanup so it happens quickly. | 1989 # cleanup so it happens quickly. |
1543 self._TearDown() | 1990 self._TearDown() |
1544 | 1991 |
1545 return 0 | 1992 return 0 |
1546 | 1993 |
1547 | 1994 |
1548 class UploadObjectTuple(object): | |
1549 """Picklable tuple with necessary metadata for an insert object call.""" | |
1550 | |
1551 def __init__(self, bucket_name, object_name, filepath=None, md5=None, | |
1552 contents=None): | |
1553 """Create an upload tuple. | |
1554 | |
1555 Args: | |
1556 bucket_name: Name of the bucket to upload to. | |
1557 object_name: Name of the object to upload to. | |
1558 filepath: A file path located in self.file_contents and self.file_md5s. | |
1559 md5: The MD5 hash of the object being uploaded. | |
1560 contents: The contents of the file to be uploaded. | |
1561 | |
1562 Note: (contents + md5) and filepath are mutually exlusive. You may specify | |
1563 one or the other, but not both. | |
1564 Note: If one of contents or md5 are specified, they must both be specified. | |
1565 | |
1566 Raises: | |
1567 InvalidArgument: if the arguments are invalid. | |
1568 """ | |
1569 self.bucket_name = bucket_name | |
1570 self.object_name = object_name | |
1571 self.filepath = filepath | |
1572 self.md5 = md5 | |
1573 self.contents = contents | |
1574 if filepath and (md5 or contents is not None): | |
1575 raise InvalidArgument( | |
1576 'Only one of filepath or (md5 + contents) may be specified.') | |
1577 if not filepath and (not md5 or contents is None): | |
1578 raise InvalidArgument( | |
1579 'Both md5 and contents must be specified.') | |
1580 | |
1581 | |
1582 def StorageUrlToUploadObjectMetadata(storage_url): | 1995 def StorageUrlToUploadObjectMetadata(storage_url): |
1583 if storage_url.IsCloudUrl() and storage_url.IsObject(): | 1996 if storage_url.IsCloudUrl() and storage_url.IsObject(): |
1584 upload_target = apitools_messages.Object() | 1997 upload_target = apitools_messages.Object() |
1585 upload_target.name = storage_url.object_name | 1998 upload_target.name = storage_url.object_name |
1586 upload_target.bucket = storage_url.bucket_name | 1999 upload_target.bucket = storage_url.bucket_name |
1587 return upload_target | 2000 return upload_target |
1588 else: | 2001 else: |
1589 raise CommandException('Non-cloud URL upload target %s was created in ' | 2002 raise CommandException('Non-cloud URL upload target %s was created in ' |
1590 'perfdiag implemenation.' % storage_url) | 2003 'perfdiag implemenation.' % storage_url) |
OLD | NEW |