| Index: third_party/gsutil/gslib/tests/test_perfdiag.py | 
| diff --git a/third_party/gsutil/gslib/tests/test_perfdiag.py b/third_party/gsutil/gslib/tests/test_perfdiag.py | 
| index 0f0409d1f4a0f663c7ab4a812933902b34ab78e0..a60155c3da5b3a9f991d8a31895cca638a9f12fa 100644 | 
| --- a/third_party/gsutil/gslib/tests/test_perfdiag.py | 
| +++ b/third_party/gsutil/gslib/tests/test_perfdiag.py | 
| @@ -19,8 +19,11 @@ from __future__ import absolute_import | 
| import os | 
| import socket | 
|  | 
| +import boto | 
| + | 
| import gslib.tests.testcase as testcase | 
| from gslib.tests.util import ObjectToURI as suri | 
| +from gslib.tests.util import RUN_S3_TESTS | 
| from gslib.tests.util import unittest | 
| from gslib.util import IS_WINDOWS | 
|  | 
| @@ -30,21 +33,23 @@ class TestPerfDiag(testcase.GsUtilIntegrationTestCase): | 
|  | 
| # We want to test that perfdiag works both when connecting to the standard gs | 
| # endpoint, and when connecting to a specific IP or host while setting the | 
| -  # host header. For the 2nd case we resolve storage.googleapis.com to a | 
| -  # specific IP and connect to that explicitly. | 
| -  _gs_ip = socket.gethostbyname('storage.googleapis.com') | 
| +  # host header. For the 2nd case we resolve gs_host (normally | 
| +  # storage.googleapis.com) to a specific IP and connect to that explicitly. | 
| +  _gs_host = boto.config.get( | 
| +      'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost) | 
| +  _gs_ip = socket.gethostbyname(_gs_host) | 
| _custom_endpoint_flags = [ | 
| '-o', 'Credentials:gs_host=' + _gs_ip, | 
| -      '-o', 'Credentials:gs_host_header=storage.googleapis.com', | 
| +      '-o', 'Credentials:gs_host_header=' + _gs_host, | 
| # TODO: gsutil-beta: Add host header support for JSON | 
| '-o', 'Boto:https_validate_certificates=False'] | 
|  | 
| def _should_run_with_custom_endpoints(self): | 
| # Host headers are only supported for XML, and not when | 
| # using environment variables for proxies. | 
| -    return self.test_api == 'XML' and not (os.environ.get('http_proxy') or | 
| -                                           os.environ.get('https_proxy') or | 
| -                                           os.environ.get('HTTPS_PROXY')) | 
| +    return (self.test_api == 'XML' and not RUN_S3_TESTS and not | 
| +            (os.environ.get('http_proxy') or os.environ.get('https_proxy') or | 
| +             os.environ.get('HTTPS_PROXY'))) | 
|  | 
| def test_latency(self): | 
| bucket_uri = self.CreateBucket() | 
| @@ -54,43 +59,73 @@ class TestPerfDiag(testcase.GsUtilIntegrationTestCase): | 
| self.RunGsUtil(self._custom_endpoint_flags + cmd) | 
| self.AssertNObjectsInBucket(bucket_uri, 0, versioned=True) | 
|  | 
| -  def _run_basic_wthru_or_rthru(self, test_name, num_processes, num_threads): | 
| +  def _run_throughput_test(self, test_name, num_processes, num_threads, | 
| +                           parallelism_strategy=None): | 
| bucket_uri = self.CreateBucket() | 
| + | 
| cmd = ['perfdiag', '-n', str(num_processes * num_threads), | 
| -           '-s', '1024', '-c', str(num_processes), | 
| -           '-k', str(num_threads), '-t', test_name, suri(bucket_uri)] | 
| +           '-s', '1024', '-c', str(num_processes), '-k', str(num_threads), | 
| +           '-t', test_name] | 
| +    if parallelism_strategy: | 
| +      cmd += ['-p', parallelism_strategy] | 
| +    cmd += [suri(bucket_uri)] | 
| + | 
| self.RunGsUtil(cmd) | 
| if self._should_run_with_custom_endpoints(): | 
| self.RunGsUtil(self._custom_endpoint_flags + cmd) | 
| self.AssertNObjectsInBucket(bucket_uri, 0, versioned=True) | 
|  | 
| +  def _run_each_parallel_throughput_test(self, test_name, num_processes, | 
| +                                         num_threads): | 
| +    self._run_throughput_test(test_name, num_processes, num_threads, 'fan') | 
| +    if not RUN_S3_TESTS: | 
| +      self._run_throughput_test(test_name, num_processes, num_threads, 'slice') | 
| +      self._run_throughput_test(test_name, num_processes, num_threads, 'both') | 
| + | 
| def test_write_throughput_single_process_single_thread(self): | 
| -    self._run_basic_wthru_or_rthru('wthru', 1, 1) | 
| +    self._run_throughput_test('wthru', 1, 1) | 
| +    self._run_throughput_test('wthru_file', 1, 1) | 
|  | 
| def test_write_throughput_single_process_multi_thread(self): | 
| -    self._run_basic_wthru_or_rthru('wthru', 1, 2) | 
| +    self._run_each_parallel_throughput_test('wthru', 1, 2) | 
| +    self._run_each_parallel_throughput_test('wthru_file', 1, 2) | 
|  | 
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | 
| def test_write_throughput_multi_process_single_thread(self): | 
| -    self._run_basic_wthru_or_rthru('wthru', 2, 1) | 
| +    self._run_each_parallel_throughput_test('wthru', 2, 1) | 
| +    self._run_each_parallel_throughput_test('wthru_file', 2, 1) | 
|  | 
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | 
| def test_write_throughput_multi_process_multi_thread(self): | 
| -    self._run_basic_wthru_or_rthru('wthru', 2, 2) | 
| +    self._run_each_parallel_throughput_test('wthru', 2, 2) | 
| +    self._run_each_parallel_throughput_test('wthru_file', 2, 2) | 
|  | 
| def test_read_throughput_single_process_single_thread(self): | 
| -    self._run_basic_wthru_or_rthru('rthru', 1, 1) | 
| +    self._run_throughput_test('rthru', 1, 1) | 
| +    self._run_throughput_test('rthru_file', 1, 1) | 
|  | 
| def test_read_throughput_single_process_multi_thread(self): | 
| -    self._run_basic_wthru_or_rthru('rthru', 1, 2) | 
| +    self._run_each_parallel_throughput_test('rthru', 1, 2) | 
| +    self._run_each_parallel_throughput_test('rthru_file', 1, 2) | 
|  | 
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | 
| def test_read_throughput_multi_process_single_thread(self): | 
| -    self._run_basic_wthru_or_rthru('rthru', 2, 1) | 
| +    self._run_each_parallel_throughput_test('rthru', 2, 1) | 
| +    self._run_each_parallel_throughput_test('rthru_file', 2, 1) | 
|  | 
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | 
| def test_read_throughput_multi_process_multi_thread(self): | 
| -    self._run_basic_wthru_or_rthru('rthru', 2, 2) | 
| +    self._run_each_parallel_throughput_test('rthru', 2, 2) | 
| +    self._run_each_parallel_throughput_test('rthru_file', 2, 2) | 
| + | 
| +  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | 
| +  def test_read_and_write_file_ordering(self): | 
| +    """Tests that rthru_file and wthru_file work when run together.""" | 
| +    self._run_throughput_test('rthru_file,wthru_file', 1, 1) | 
| +    self._run_throughput_test('rthru_file,wthru_file', 2, 2, 'fan') | 
| +    if not RUN_S3_TESTS: | 
| +      self._run_throughput_test('rthru_file,wthru_file', 2, 2, 'slice') | 
| +      self._run_throughput_test('rthru_file,wthru_file', 2, 2, 'both') | 
|  | 
| def test_input_output(self): | 
| outpath = self.CreateTempFile() | 
| @@ -109,7 +144,7 @@ class TestPerfDiag(testcase.GsUtilIntegrationTestCase): | 
| stderr = self.RunGsUtil( | 
| ['perfdiag', '-n', '1', '-s', '3pb', '-t', 'wthru', 'gs://foobar'], | 
| expected_status=1, return_stderr=True) | 
| -    self.assertIn('Maximum throughput file size', stderr) | 
| +    self.assertIn('in-memory tests maximum file size', stderr) | 
|  | 
| def test_listing(self): | 
| bucket_uri = self.CreateBucket() | 
|  |