Index: third_party/gsutil/gslib/tests/test_command_runner.py |
diff --git a/third_party/gsutil/gslib/tests/test_command_runner.py b/third_party/gsutil/gslib/tests/test_command_runner.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..bf4d037d97f118cc0345db9f45b5719921e94f2e |
--- /dev/null |
+++ b/third_party/gsutil/gslib/tests/test_command_runner.py |
@@ -0,0 +1,422 @@ |
+# -*- coding: utf-8 -*- |
+# Copyright 2011 Google Inc. All Rights Reserved. |
+# |
+# Licensed under the Apache License, Version 2.0 (the "License"); |
+# you may not use this file except in compliance with the License. |
+# You may obtain a copy of the License at |
+# |
+# http://www.apache.org/licenses/LICENSE-2.0 |
+# |
+# Unless required by applicable law or agreed to in writing, software |
+# distributed under the License is distributed on an "AS IS" BASIS, |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+# See the License for the specific language governing permissions and |
+# limitations under the License. |
+"""Unit and integration tests for gsutil command_runner module.""" |
+ |
+from __future__ import absolute_import |
+ |
+import logging |
+import os |
+import time |
+ |
+import gslib |
+from gslib import command_runner |
+from gslib.command import Command |
+from gslib.command_argument import CommandArgument |
+from gslib.command_runner import CommandRunner |
+from gslib.command_runner import HandleArgCoding |
+from gslib.exception import CommandException |
+from gslib.tab_complete import CloudObjectCompleter |
+from gslib.tab_complete import CloudOrLocalObjectCompleter |
+from gslib.tab_complete import LocalObjectCompleter |
+from gslib.tab_complete import LocalObjectOrCannedACLCompleter |
+from gslib.tab_complete import NoOpCompleter |
+import gslib.tests.testcase as testcase |
+import gslib.tests.util as util |
+from gslib.tests.util import ARGCOMPLETE_AVAILABLE |
+from gslib.tests.util import SetBotoConfigFileForTest |
+from gslib.tests.util import SetBotoConfigForTest |
+from gslib.tests.util import unittest |
+from gslib.util import GSUTIL_PUB_TARBALL |
+from gslib.util import SECONDS_PER_DAY |
+ |
+ |
+class FakeArgparseArgument(object): |
+ """Fake for argparse parser argument.""" |
+ pass |
+ |
+ |
+class FakeArgparseParser(object): |
+ """Fake for argparse parser.""" |
+ |
+ def __init__(self): |
+ self.arguments = [] |
+ |
+ def add_argument(self, *unused_args, **unused_kwargs): |
+ argument = FakeArgparseArgument() |
+ self.arguments.append(argument) |
+ return argument |
+ |
+ |
+class FakeArgparseSubparsers(object): |
+ """Container for nested parsers.""" |
+ |
+ def __init__(self): |
+ self.parsers = [] |
+ |
+ def add_parser(self, unused_name, **unused_kwargs): |
+ parser = FakeArgparseParser() |
+ self.parsers.append(parser) |
+ return parser |
+ |
+ |
+class FakeCommandWithInvalidCompleter(Command): |
+ """Command with an invalid completer on an argument.""" |
+ |
+ command_spec = Command.CreateCommandSpec( |
+ 'fake1', |
+ argparse_arguments=[ |
+ CommandArgument('arg', completer='BAD') |
+ ] |
+ ) |
+ |
+ help_spec = Command.HelpSpec( |
+ help_name='fake1', |
+ help_name_aliases=[], |
+ help_type='command_help', |
+ help_one_line_summary='fake command for tests', |
+ help_text='fake command for tests', |
+ subcommand_help_text={} |
+ ) |
+ |
+ def __init__(self): |
+ pass |
+ |
+ |
+class FakeCommandWithCompleters(Command): |
+ """Command with various completer types.""" |
+ |
+ command_spec = Command.CreateCommandSpec( |
+ 'fake2', |
+ argparse_arguments=[ |
+ CommandArgument.MakeZeroOrMoreCloudURLsArgument(), |
+ CommandArgument.MakeZeroOrMoreFileURLsArgument(), |
+ CommandArgument.MakeZeroOrMoreCloudOrFileURLsArgument(), |
+ CommandArgument.MakeFreeTextArgument(), |
+ CommandArgument.MakeZeroOrMoreCloudBucketURLsArgument(), |
+ CommandArgument.MakeFileURLOrCannedACLArgument(), |
+ ] |
+ ) |
+ |
+ help_spec = Command.HelpSpec( |
+ help_name='fake2', |
+ help_name_aliases=[], |
+ help_type='command_help', |
+ help_one_line_summary='fake command for tests', |
+ help_text='fake command for tests', |
+ subcommand_help_text={} |
+ ) |
+ |
+ def __init__(self): |
+ pass |
+ |
+ |
+class TestCommandRunnerUnitTests( |
+ testcase.unit_testcase.GsUtilUnitTestCase): |
+ """Unit tests for gsutil update check in command_runner module.""" |
+ |
+ def setUp(self): |
+ """Sets up the command runner mock objects.""" |
+ super(TestCommandRunnerUnitTests, self).setUp() |
+ |
+ # Mock out the timestamp file so we can manipulate it. |
+ self.previous_update_file = ( |
+ command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE) |
+ self.timestamp_file = self.CreateTempFile() |
+ command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = ( |
+ self.timestamp_file) |
+ |
+ # Mock out the gsutil version checker. |
+ base_version = unicode(gslib.VERSION) |
+ while not base_version.isnumeric(): |
+ if not base_version: |
+ raise CommandException( |
+ 'Version number (%s) is not numeric.' % gslib.VERSION) |
+ base_version = base_version[:-1] |
+ command_runner.LookUpGsutilVersion = lambda u, v: float(base_version) + 1 |
+ |
+ # Mock out raw_input to trigger yes prompt. |
+ command_runner.raw_input = lambda p: 'y' |
+ |
+ # Mock out TTY check to pretend we're on a TTY even if we're not. |
+ self.running_interactively = True |
+ command_runner.IsRunningInteractively = lambda: self.running_interactively |
+ |
+ # Mock out the modified time of the VERSION file. |
+ self.version_mod_time = 0 |
+ self.previous_version_mod_time = command_runner.GetGsutilVersionModifiedTime |
+ command_runner.GetGsutilVersionModifiedTime = lambda: self.version_mod_time |
+ |
+ # Create a fake pub tarball that will be used to check for gsutil version. |
+ self.pub_bucket_uri = self.CreateBucket('pub') |
+ self.gsutil_tarball_uri = self.CreateObject( |
+ bucket_uri=self.pub_bucket_uri, object_name='gsutil.tar.gz', |
+ contents='foo') |
+ |
+ def tearDown(self): |
+ """Tears down the command runner mock objects.""" |
+ super(TestCommandRunnerUnitTests, self).tearDown() |
+ |
+ command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = ( |
+ self.previous_update_file) |
+ command_runner.LookUpGsutilVersion = gslib.util.LookUpGsutilVersion |
+ command_runner.raw_input = raw_input |
+ |
+ command_runner.GetGsutilVersionModifiedTime = self.previous_version_mod_time |
+ |
+ command_runner.IsRunningInteractively = gslib.util.IsRunningInteractively |
+ |
+ self.gsutil_tarball_uri.delete_key() |
+ self.pub_bucket_uri.delete_bucket() |
+ |
+ def _IsPackageOrCloudSDKInstall(self): |
+ # Update should not trigger for package installs or Cloud SDK installs. |
+ return (gslib.IS_PACKAGE_INSTALL or |
+ os.environ.get('CLOUDSDK_WRAPPER') == '1') |
+ |
+ @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') |
+ def test_not_interactive(self): |
+ """Tests that update is not triggered if not running interactively.""" |
+ with SetBotoConfigForTest([ |
+ ('GSUtil', 'software_update_check_period', '1')]): |
+ with open(self.timestamp_file, 'w') as f: |
+ f.write(str(int(time.time() - 2 * SECONDS_PER_DAY))) |
+ self.running_interactively = False |
+ self.assertEqual( |
+ False, |
+ self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) |
+ |
+ @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') |
+ def test_no_tracker_file_version_recent(self): |
+ """Tests when no timestamp file exists and VERSION file is recent.""" |
+ if os.path.exists(self.timestamp_file): |
+ os.remove(self.timestamp_file) |
+ self.assertFalse(os.path.exists(self.timestamp_file)) |
+ self.version_mod_time = time.time() |
+ self.assertEqual( |
+ False, |
+ self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) |
+ |
+ @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') |
+ def test_no_tracker_file_version_old(self): |
+ """Tests when no timestamp file exists and VERSION file is old.""" |
+ if os.path.exists(self.timestamp_file): |
+ os.remove(self.timestamp_file) |
+ self.assertFalse(os.path.exists(self.timestamp_file)) |
+ self.version_mod_time = 0 |
+ expected = not self._IsPackageOrCloudSDKInstall() |
+ self.assertEqual( |
+ expected, |
+ self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) |
+ |
+ @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') |
+ def test_invalid_commands(self): |
+ """Tests that update is not triggered for certain commands.""" |
+ self.assertEqual( |
+ False, |
+ self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('update', 0)) |
+ |
+ @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') |
+ def test_invalid_file_contents(self): |
+ """Tests no update if timestamp file has invalid value.""" |
+ with open(self.timestamp_file, 'w') as f: |
+ f.write('NaN') |
+ self.assertEqual( |
+ False, |
+ self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) |
+ |
+ @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') |
+ def test_update_should_trigger(self): |
+ """Tests update should be triggered if time is up.""" |
+ with SetBotoConfigForTest([ |
+ ('GSUtil', 'software_update_check_period', '1')]): |
+ with open(self.timestamp_file, 'w') as f: |
+ f.write(str(int(time.time() - 2 * SECONDS_PER_DAY))) |
+ expected = not self._IsPackageOrCloudSDKInstall() |
+ self.assertEqual( |
+ expected, |
+ self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) |
+ |
+ @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') |
+ def test_not_time_for_update_yet(self): |
+ """Tests update not triggered if not time yet.""" |
+ with SetBotoConfigForTest([ |
+ ('GSUtil', 'software_update_check_period', '3')]): |
+ with open(self.timestamp_file, 'w') as f: |
+ f.write(str(int(time.time() - 2 * SECONDS_PER_DAY))) |
+ self.assertEqual( |
+ False, |
+ self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) |
+ |
+ def test_user_says_no_to_update(self): |
+ """Tests no update triggered if user says no at the prompt.""" |
+ with SetBotoConfigForTest([ |
+ ('GSUtil', 'software_update_check_period', '1')]): |
+ with open(self.timestamp_file, 'w') as f: |
+ f.write(str(int(time.time() - 2 * SECONDS_PER_DAY))) |
+ command_runner.raw_input = lambda p: 'n' |
+ self.assertEqual( |
+ False, |
+ self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) |
+ |
+ @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') |
+ def test_update_check_skipped_with_quiet_mode(self): |
+ """Tests that update isn't triggered when loglevel is in quiet mode.""" |
+ with SetBotoConfigForTest([ |
+ ('GSUtil', 'software_update_check_period', '1')]): |
+ with open(self.timestamp_file, 'w') as f: |
+ f.write(str(int(time.time() - 2 * SECONDS_PER_DAY))) |
+ |
+ expected = not self._IsPackageOrCloudSDKInstall() |
+ self.assertEqual( |
+ expected, |
+ self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) |
+ |
+ prev_loglevel = logging.getLogger().getEffectiveLevel() |
+ try: |
+ logging.getLogger().setLevel(logging.ERROR) |
+ # With reduced loglevel, should return False. |
+ self.assertEqual( |
+ False, |
+ self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) |
+ finally: |
+ logging.getLogger().setLevel(prev_loglevel) |
+ |
+ def test_command_argument_parser_setup_invalid_completer(self): |
+ |
+ command_map = { |
+ FakeCommandWithInvalidCompleter.command_spec.command_name: |
+ FakeCommandWithInvalidCompleter() |
+ } |
+ |
+ runner = CommandRunner( |
+ bucket_storage_uri_class=self.mock_bucket_storage_uri, |
+ gsutil_api_class_map_factory=self.mock_gsutil_api_class_map_factory, |
+ command_map=command_map) |
+ |
+ subparsers = FakeArgparseSubparsers() |
+ try: |
+ runner.ConfigureCommandArgumentParsers(subparsers) |
+ except RuntimeError as e: |
+ self.assertIn('Unknown completer', e.message) |
+ |
+ @unittest.skipUnless(ARGCOMPLETE_AVAILABLE, |
+ 'Tab completion requires argcomplete') |
+ def test_command_argument_parser_setup_completers(self): |
+ |
+ command_map = { |
+ FakeCommandWithCompleters.command_spec.command_name: |
+ FakeCommandWithCompleters() |
+ } |
+ |
+ runner = CommandRunner( |
+ bucket_storage_uri_class=self.mock_bucket_storage_uri, |
+ gsutil_api_class_map_factory=self.mock_gsutil_api_class_map_factory, |
+ command_map=command_map) |
+ |
+ subparsers = FakeArgparseSubparsers() |
+ runner.ConfigureCommandArgumentParsers(subparsers) |
+ |
+ self.assertEqual(1, len(subparsers.parsers)) |
+ parser = subparsers.parsers[0] |
+ self.assertEqual(6, len(parser.arguments)) |
+ self.assertEqual(CloudObjectCompleter, type(parser.arguments[0].completer)) |
+ self.assertEqual(LocalObjectCompleter, type(parser.arguments[1].completer)) |
+ self.assertEqual( |
+ CloudOrLocalObjectCompleter, type(parser.arguments[2].completer)) |
+ self.assertEqual( |
+ NoOpCompleter, type(parser.arguments[3].completer)) |
+ self.assertEqual(CloudObjectCompleter, type(parser.arguments[4].completer)) |
+ self.assertEqual( |
+ LocalObjectOrCannedACLCompleter, type(parser.arguments[5].completer)) |
+ |
+ # pylint: disable=invalid-encoded-data |
+ def test_valid_arg_coding(self): |
+ """Tests that gsutil encodes valid args correctly.""" |
+ # Args other than -h and -p should be utf-8 decoded. |
+ args = HandleArgCoding(['ls', '-l']) |
+ self.assertIs(type(args[0]), unicode) |
+ self.assertIs(type(args[1]), unicode) |
+ |
+ # -p and -h args other than x-goog-meta should not be decoded. |
+ args = HandleArgCoding(['ls', '-p', 'abc:def', 'gs://bucket']) |
+ self.assertIs(type(args[0]), unicode) |
+ self.assertIs(type(args[1]), unicode) |
+ self.assertIsNot(type(args[2]), unicode) |
+ self.assertIs(type(args[3]), unicode) |
+ |
+ args = HandleArgCoding(['gsutil', '-h', 'content-type:text/plain', 'cp', |
+ 'a', 'gs://bucket']) |
+ self.assertIs(type(args[0]), unicode) |
+ self.assertIs(type(args[1]), unicode) |
+ self.assertIsNot(type(args[2]), unicode) |
+ self.assertIs(type(args[3]), unicode) |
+ self.assertIs(type(args[4]), unicode) |
+ self.assertIs(type(args[5]), unicode) |
+ |
+ # -h x-goog-meta args should be decoded. |
+ args = HandleArgCoding(['gsutil', '-h', 'x-goog-meta-abc', '1234']) |
+ self.assertIs(type(args[0]), unicode) |
+ self.assertIs(type(args[1]), unicode) |
+ self.assertIs(type(args[2]), unicode) |
+ self.assertIs(type(args[3]), unicode) |
+ |
+ # -p and -h args with non-ASCII content should raise CommandException. |
+ try: |
+ HandleArgCoding(['ls', '-p', '碼']) |
+ # Ensure exception is raised. |
+ self.assertTrue(False) |
+ except CommandException as e: |
+ self.assertIn('Invalid non-ASCII header', e.reason) |
+ try: |
+ HandleArgCoding(['-h', '碼', 'ls']) |
+ # Ensure exception is raised. |
+ self.assertTrue(False) |
+ except CommandException as e: |
+ self.assertIn('Invalid non-ASCII header', e.reason) |
+ |
+ |
+class TestCommandRunnerIntegrationTests( |
+ testcase.GsUtilIntegrationTestCase): |
+ """Integration tests for gsutil update check in command_runner module.""" |
+ |
+ def setUp(self): |
+ """Sets up the command runner mock objects.""" |
+ super(TestCommandRunnerIntegrationTests, self).setUp() |
+ |
+ # Mock out the timestamp file so we can manipulate it. |
+ self.previous_update_file = ( |
+ command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE) |
+ self.timestamp_file = self.CreateTempFile(contents='0') |
+ command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = ( |
+ self.timestamp_file) |
+ |
+ # Mock out raw_input to trigger yes prompt. |
+ command_runner.raw_input = lambda p: 'y' |
+ |
+ def tearDown(self): |
+ """Tears down the command runner mock objects.""" |
+ super(TestCommandRunnerIntegrationTests, self).tearDown() |
+ command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = ( |
+ self.previous_update_file) |
+ command_runner.raw_input = raw_input |
+ |
+ @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') |
+ def test_lookup_version_without_credentials(self): |
+ """Tests that gsutil tarball version lookup works without credentials.""" |
+ with SetBotoConfigFileForTest(self.CreateTempFile( |
+ contents='[GSUtil]\nsoftware_update_check_period=1')): |
+ self.command_runner = command_runner.CommandRunner() |
+ # Looking up software version shouldn't get auth failure exception. |
+ self.command_runner.RunNamedCommand('ls', [GSUTIL_PUB_TARBALL]) |