Index: third_party/gsutil/gslib/tests/testcase/integration_testcase.py |
diff --git a/third_party/gsutil/gslib/tests/testcase/integration_testcase.py b/third_party/gsutil/gslib/tests/testcase/integration_testcase.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..0979c453a4e17565b1b4f6e36cefe6f29ed9e95b |
--- /dev/null |
+++ b/third_party/gsutil/gslib/tests/testcase/integration_testcase.py |
@@ -0,0 +1,472 @@ |
+# -*- coding: utf-8 -*- |
+# Copyright 2013 Google Inc. All Rights Reserved. |
+# |
+# Licensed under the Apache License, Version 2.0 (the "License"); |
+# you may not use this file except in compliance with the License. |
+# You may obtain a copy of the License at |
+# |
+# http://www.apache.org/licenses/LICENSE-2.0 |
+# |
+# Unless required by applicable law or agreed to in writing, software |
+# distributed under the License is distributed on an "AS IS" BASIS, |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+# See the License for the specific language governing permissions and |
+# limitations under the License. |
+"""Contains gsutil base integration test case class.""" |
+ |
+from __future__ import absolute_import |
+ |
+from contextlib import contextmanager |
+import cStringIO |
+import locale |
+import logging |
+import os |
+import subprocess |
+import sys |
+import tempfile |
+ |
+import boto |
+from boto.exception import StorageResponseError |
+from boto.s3.deletemarker import DeleteMarker |
+from boto.storage_uri import BucketStorageUri |
+ |
+import gslib |
+from gslib.gcs_json_api import GcsJsonApi |
+from gslib.hashing_helper import Base64ToHexHash |
+from gslib.project_id import GOOG_PROJ_ID_HDR |
+from gslib.project_id import PopulateProjectId |
+from gslib.tests.testcase import base |
+import gslib.tests.util as util |
+from gslib.tests.util import ObjectToURI as suri |
+from gslib.tests.util import RUN_S3_TESTS |
+from gslib.tests.util import SetBotoConfigFileForTest |
+from gslib.tests.util import SetBotoConfigForTest |
+from gslib.tests.util import SetEnvironmentForTest |
+from gslib.tests.util import unittest |
+import gslib.third_party.storage_apitools.storage_v1_messages as apitools_messages |
+from gslib.util import IS_WINDOWS |
+from gslib.util import Retry |
+from gslib.util import UTF8 |
+ |
+ |
+LOGGER = logging.getLogger('integration-test') |
+ |
+# Contents of boto config file that will tell gsutil not to override the real |
+# error message with a warning about anonymous access if no credentials are |
+# provided in the config file. Also, because we retry 401s, reduce the number |
+# of retries so we don't go through a long exponential backoff in tests. |
+BOTO_CONFIG_CONTENTS_IGNORE_ANON_WARNING = """ |
+[Boto] |
+num_retries = 2 |
+[Tests] |
+bypass_anonymous_access_warning = True |
+""" |
+ |
+ |
+def SkipForGS(reason): |
+ if not RUN_S3_TESTS: |
+ return unittest.skip(reason) |
+ else: |
+ return lambda func: func |
+ |
+ |
+def SkipForS3(reason): |
+ if RUN_S3_TESTS: |
+ return unittest.skip(reason) |
+ else: |
+ return lambda func: func |
+ |
+ |
+# TODO: Right now, most tests use the XML API. Instead, they should respect |
+# prefer_api in the same way that commands do. |
+@unittest.skipUnless(util.RUN_INTEGRATION_TESTS, |
+ 'Not running integration tests.') |
+class GsUtilIntegrationTestCase(base.GsUtilTestCase): |
+ """Base class for gsutil integration tests.""" |
+ GROUP_TEST_ADDRESS = 'gs-discussion@googlegroups.com' |
+ GROUP_TEST_ID = ( |
+ '00b4903a97d097895ab58ef505d535916a712215b79c3e54932c2eb502ad97f5') |
+ USER_TEST_ADDRESS = 'gs-team@google.com' |
+ USER_TEST_ID = ( |
+ '00b4903a9703325c6bfc98992d72e75600387a64b3b6bee9ef74613ef8842080') |
+ DOMAIN_TEST = 'google.com' |
+ # No one can create this bucket without owning the gmail.com domain, and we |
+ # won't create this bucket, so it shouldn't exist. |
+ # It would be nice to use google.com here but JSON API disallows |
+ # 'google' in resource IDs. |
+ nonexistent_bucket_name = 'nonexistent-bucket-foobar.gmail.com' |
+ |
+ def setUp(self): |
+ """Creates base configuration for integration tests.""" |
+ super(GsUtilIntegrationTestCase, self).setUp() |
+ self.bucket_uris = [] |
+ |
+ # Set up API version and project ID handler. |
+ self.api_version = boto.config.get_value( |
+ 'GSUtil', 'default_api_version', '1') |
+ |
+ # Instantiate a JSON API for use by the current integration test. |
+ self.json_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), |
+ 'gs') |
+ |
+ if util.RUN_S3_TESTS: |
+ self.nonexistent_bucket_name = ( |
+ 'nonexistentbucket-asf801rj3r9as90mfnnkjxpo02') |
+ |
+ # Retry with an exponential backoff if a server error is received. This |
+ # ensures that we try *really* hard to clean up after ourselves. |
+ # TODO: As long as we're still using boto to do the teardown, |
+ # we decorate with boto exceptions. Eventually this should be migrated |
+ # to CloudApi exceptions. |
+ @Retry(StorageResponseError, tries=7, timeout_secs=1) |
+ def tearDown(self): |
+ super(GsUtilIntegrationTestCase, self).tearDown() |
+ |
+ while self.bucket_uris: |
+ bucket_uri = self.bucket_uris[-1] |
+ try: |
+ bucket_list = self._ListBucket(bucket_uri) |
+ except StorageResponseError, e: |
+ # This can happen for tests of rm -r command, which for bucket-only |
+ # URIs delete the bucket at the end. |
+ if e.status == 404: |
+ self.bucket_uris.pop() |
+ continue |
+ else: |
+ raise |
+ while bucket_list: |
+ error = None |
+ for k in bucket_list: |
+ try: |
+ if isinstance(k, DeleteMarker): |
+ bucket_uri.get_bucket().delete_key(k.name, |
+ version_id=k.version_id) |
+ else: |
+ k.delete() |
+ except StorageResponseError, e: |
+ # This could happen if objects that have already been deleted are |
+ # still showing up in the listing due to eventual consistency. In |
+ # that case, we continue on until we've tried to deleted every |
+ # object in the listing before raising the error on which to retry. |
+ if e.status == 404: |
+ error = e |
+ else: |
+ raise |
+ if error: |
+ raise error # pylint: disable=raising-bad-type |
+ bucket_list = self._ListBucket(bucket_uri) |
+ bucket_uri.delete_bucket() |
+ self.bucket_uris.pop() |
+ |
+ def _ListBucket(self, bucket_uri): |
+ if bucket_uri.scheme == 's3': |
+ # storage_uri will omit delete markers from bucket listings, but |
+ # these must be deleted before we can remove an S3 bucket. |
+ return list(v for v in bucket_uri.get_bucket().list_versions()) |
+ return list(bucket_uri.list_bucket(all_versions=True)) |
+ |
+ def AssertNObjectsInBucket(self, bucket_uri, num_objects, versioned=False): |
+ """Checks (with retries) that 'ls bucket_uri/**' returns num_objects. |
+ |
+ This is a common test pattern to deal with eventual listing consistency for |
+ tests that rely on a set of objects to be listed. |
+ |
+ Args: |
+ bucket_uri: storage_uri for the bucket. |
+ num_objects: number of objects expected in the bucket. |
+ versioned: If True, perform a versioned listing. |
+ |
+ Raises: |
+ AssertionError if number of objects does not match expected value. |
+ |
+ Returns: |
+ Listing split across lines. |
+ """ |
+ # Use @Retry as hedge against bucket listing eventual consistency. |
+ @Retry(AssertionError, tries=5, timeout_secs=1) |
+ def _Check1(): |
+ command = ['ls', '-a'] if versioned else ['ls'] |
+ b_uri = [suri(bucket_uri) + '/**'] if num_objects else [suri(bucket_uri)] |
+ listing = self.RunGsUtil(command + b_uri, return_stdout=True).split('\n') |
+ # num_objects + one trailing newline. |
+ self.assertEquals(len(listing), num_objects + 1) |
+ return listing |
+ return _Check1() |
+ |
+ def CreateBucket(self, bucket_name=None, test_objects=0, storage_class=None, |
+ provider=None, prefer_json_api=False): |
+ """Creates a test bucket. |
+ |
+ The bucket and all of its contents will be deleted after the test. |
+ |
+ Args: |
+ bucket_name: Create the bucket with this name. If not provided, a |
+ temporary test bucket name is constructed. |
+ test_objects: The number of objects that should be placed in the bucket. |
+ Defaults to 0. |
+ storage_class: storage class to use. If not provided we us standard. |
+ provider: Provider to use - either "gs" (the default) or "s3". |
+ prefer_json_api: If true, use the JSON creation functions where possible. |
+ |
+ Returns: |
+ StorageUri for the created bucket. |
+ """ |
+ if not provider: |
+ provider = self.default_provider |
+ |
+ if prefer_json_api and provider == 'gs': |
+ json_bucket = self.CreateBucketJson(bucket_name=bucket_name, |
+ test_objects=test_objects, |
+ storage_class=storage_class) |
+ bucket_uri = boto.storage_uri( |
+ 'gs://%s' % json_bucket.name.encode(UTF8).lower(), |
+ suppress_consec_slashes=False) |
+ self.bucket_uris.append(bucket_uri) |
+ return bucket_uri |
+ |
+ bucket_name = bucket_name or self.MakeTempName('bucket') |
+ |
+ bucket_uri = boto.storage_uri('%s://%s' % (provider, bucket_name.lower()), |
+ suppress_consec_slashes=False) |
+ |
+ if provider == 'gs': |
+ # Apply API version and project ID headers if necessary. |
+ headers = {'x-goog-api-version': self.api_version} |
+ headers[GOOG_PROJ_ID_HDR] = PopulateProjectId() |
+ else: |
+ headers = {} |
+ |
+ # Parallel tests can easily run into bucket creation quotas. |
+ # Retry with exponential backoff so that we create them as fast as we |
+ # reasonably can. |
+ @Retry(StorageResponseError, tries=7, timeout_secs=1) |
+ def _CreateBucketWithExponentialBackoff(): |
+ bucket_uri.create_bucket(storage_class=storage_class, headers=headers) |
+ |
+ _CreateBucketWithExponentialBackoff() |
+ self.bucket_uris.append(bucket_uri) |
+ for i in range(test_objects): |
+ self.CreateObject(bucket_uri=bucket_uri, |
+ object_name=self.MakeTempName('obj'), |
+ contents='test %d' % i) |
+ return bucket_uri |
+ |
+ def CreateVersionedBucket(self, bucket_name=None, test_objects=0): |
+ """Creates a versioned test bucket. |
+ |
+ The bucket and all of its contents will be deleted after the test. |
+ |
+ Args: |
+ bucket_name: Create the bucket with this name. If not provided, a |
+ temporary test bucket name is constructed. |
+ test_objects: The number of objects that should be placed in the bucket. |
+ Defaults to 0. |
+ |
+ Returns: |
+ StorageUri for the created bucket with versioning enabled. |
+ """ |
+ bucket_uri = self.CreateBucket(bucket_name=bucket_name, |
+ test_objects=test_objects) |
+ bucket_uri.configure_versioning(True) |
+ return bucket_uri |
+ |
+ def CreateObject(self, bucket_uri=None, object_name=None, contents=None, |
+ prefer_json_api=False): |
+ """Creates a test object. |
+ |
+ Args: |
+ bucket_uri: The URI of the bucket to place the object in. If not |
+ specified, a new temporary bucket is created. |
+ object_name: The name to use for the object. If not specified, a temporary |
+ test object name is constructed. |
+ contents: The contents to write to the object. If not specified, the key |
+ is not written to, which means that it isn't actually created |
+ yet on the server. |
+ prefer_json_api: If true, use the JSON creation functions where possible. |
+ |
+ Returns: |
+ A StorageUri for the created object. |
+ """ |
+ bucket_uri = bucket_uri or self.CreateBucket() |
+ |
+ if prefer_json_api and bucket_uri.scheme == 'gs' and contents: |
+ object_name = object_name or self.MakeTempName('obj') |
+ json_object = self.CreateObjectJson(contents=contents, |
+ bucket_name=bucket_uri.bucket_name, |
+ object_name=object_name) |
+ object_uri = bucket_uri.clone_replace_name(object_name) |
+ # pylint: disable=protected-access |
+ # Need to update the StorageUri with the correct values while |
+ # avoiding creating a versioned string. |
+ object_uri._update_from_values(None, |
+ json_object.generation, |
+ True, |
+ md5=(Base64ToHexHash(json_object.md5Hash), |
+ json_object.md5Hash.strip('\n"\''))) |
+ # pylint: enable=protected-access |
+ return object_uri |
+ |
+ bucket_uri = bucket_uri or self.CreateBucket() |
+ object_name = object_name or self.MakeTempName('obj') |
+ key_uri = bucket_uri.clone_replace_name(object_name) |
+ if contents is not None: |
+ key_uri.set_contents_from_string(contents) |
+ return key_uri |
+ |
+ def CreateBucketJson(self, bucket_name=None, test_objects=0, |
+ storage_class=None): |
+ """Creates a test bucket using the JSON API. |
+ |
+ The bucket and all of its contents will be deleted after the test. |
+ |
+ Args: |
+ bucket_name: Create the bucket with this name. If not provided, a |
+ temporary test bucket name is constructed. |
+ test_objects: The number of objects that should be placed in the bucket. |
+ Defaults to 0. |
+ storage_class: storage class to use. If not provided we us standard. |
+ |
+ Returns: |
+ Apitools Bucket for the created bucket. |
+ """ |
+ bucket_name = bucket_name or self.MakeTempName('bucket') |
+ bucket_metadata = None |
+ if storage_class: |
+ bucket_metadata = apitools_messages.Bucket( |
+ name=bucket_name.lower(), |
+ storageClass=storage_class) |
+ |
+ # TODO: Add retry and exponential backoff. |
+ bucket = self.json_api.CreateBucket(bucket_name.lower(), |
+ metadata=bucket_metadata) |
+ # Add bucket to list of buckets to be cleaned up. |
+ # TODO: Clean up JSON buckets using JSON API. |
+ self.bucket_uris.append( |
+ boto.storage_uri('gs://%s' % (bucket_name.lower()), |
+ suppress_consec_slashes=False)) |
+ for i in range(test_objects): |
+ self.CreateObjectJson(bucket_name=bucket_name, |
+ object_name=self.MakeTempName('obj'), |
+ contents='test %d' % i) |
+ return bucket |
+ |
+ def CreateObjectJson(self, contents, bucket_name=None, object_name=None): |
+ """Creates a test object (GCS provider only) using the JSON API. |
+ |
+ Args: |
+ contents: The contents to write to the object. |
+ bucket_name: Name of bucket to place the object in. If not |
+ specified, a new temporary bucket is created. |
+ object_name: The name to use for the object. If not specified, a temporary |
+ test object name is constructed. |
+ |
+ Returns: |
+ An apitools Object for the created object. |
+ """ |
+ bucket_name = bucket_name or self.CreateBucketJson().name |
+ object_name = object_name or self.MakeTempName('obj') |
+ object_metadata = apitools_messages.Object( |
+ name=object_name, |
+ bucket=bucket_name, |
+ contentType='application/octet-stream') |
+ return self.json_api.UploadObject(cStringIO.StringIO(contents), |
+ object_metadata, provider='gs') |
+ |
+ def RunGsUtil(self, cmd, return_status=False, return_stdout=False, |
+ return_stderr=False, expected_status=0, stdin=None): |
+ """Runs the gsutil command. |
+ |
+ Args: |
+ cmd: The command to run, as a list, e.g. ['cp', 'foo', 'bar'] |
+ return_status: If True, the exit status code is returned. |
+ return_stdout: If True, the standard output of the command is returned. |
+ return_stderr: If True, the standard error of the command is returned. |
+ expected_status: The expected return code. If not specified, defaults to |
+ 0. If the return code is a different value, an exception |
+ is raised. |
+ stdin: A string of data to pipe to the process as standard input. |
+ |
+ Returns: |
+ A tuple containing the desired return values specified by the return_* |
+ arguments. |
+ """ |
+ cmd = ([gslib.GSUTIL_PATH] + ['--testexceptiontraces'] + |
+ ['-o', 'GSUtil:default_project_id=' + PopulateProjectId()] + |
+ cmd) |
+ if IS_WINDOWS: |
+ cmd = [sys.executable] + cmd |
+ p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
+ stdin=subprocess.PIPE) |
+ (stdout, stderr) = p.communicate(stdin) |
+ status = p.returncode |
+ |
+ if expected_status is not None: |
+ self.assertEqual( |
+ status, expected_status, |
+ msg='Expected status %d, got %d.\nCommand:\n%s\n\nstderr:\n%s' % ( |
+ expected_status, status, ' '.join(cmd), stderr)) |
+ |
+ toreturn = [] |
+ if return_status: |
+ toreturn.append(status) |
+ if return_stdout: |
+ if IS_WINDOWS: |
+ stdout = stdout.replace('\r\n', '\n') |
+ toreturn.append(stdout) |
+ if return_stderr: |
+ if IS_WINDOWS: |
+ stderr = stderr.replace('\r\n', '\n') |
+ toreturn.append(stderr) |
+ |
+ if len(toreturn) == 1: |
+ return toreturn[0] |
+ elif toreturn: |
+ return tuple(toreturn) |
+ |
+ def RunGsUtilTabCompletion(self, cmd, expected_results=None): |
+ """Runs the gsutil command in tab completion mode. |
+ |
+ Args: |
+ cmd: The command to run, as a list, e.g. ['cp', 'foo', 'bar'] |
+ expected_results: The expected tab completion results for the given input. |
+ """ |
+ cmd = [gslib.GSUTIL_PATH] + ['--testexceptiontraces'] + cmd |
+ cmd_str = ' '.join(cmd) |
+ |
+ @Retry(AssertionError, tries=5, timeout_secs=1) |
+ def _RunTabCompletion(): |
+ """Runs the tab completion operation with retries.""" |
+ results_string = None |
+ with tempfile.NamedTemporaryFile( |
+ delete=False) as tab_complete_result_file: |
+ # argcomplete returns results via the '8' file descriptor so we |
+ # redirect to a file so we can capture them. |
+ cmd_str_with_result_redirect = '%s 8>%s' % ( |
+ cmd_str, tab_complete_result_file.name) |
+ env = os.environ.copy() |
+ env['_ARGCOMPLETE'] = '1' |
+ env['COMP_LINE'] = cmd_str |
+ env['COMP_POINT'] = str(len(cmd_str)) |
+ subprocess.call(cmd_str_with_result_redirect, env=env, shell=True) |
+ results_string = tab_complete_result_file.read().decode( |
+ locale.getpreferredencoding()) |
+ if results_string: |
+ results = results_string.split('\013') |
+ else: |
+ results = [] |
+ self.assertEqual(results, expected_results) |
+ |
+ # When tests are run in parallel, tab completion could take a long time, |
+ # so choose a long timeout value. |
+ with SetBotoConfigForTest([('GSUtil', 'tab_completion_timeout', '120')]): |
+ _RunTabCompletion() |
+ |
+ @contextmanager |
+ def SetAnonymousBotoCreds(self): |
+ boto_config_path = self.CreateTempFile( |
+ contents=BOTO_CONFIG_CONTENTS_IGNORE_ANON_WARNING) |
+ with SetBotoConfigFileForTest(boto_config_path): |
+ # Make sure to reset Developer Shell credential port so that the child |
+ # gsutil process is really anonymous. |
+ with SetEnvironmentForTest({'DEVSHELL_CLIENT_PORT': None}): |
+ yield |