OLD | NEW |
| (Empty) |
1 # -*- coding: utf-8 -*- | |
2 # Copyright 2011 Google Inc. All Rights Reserved. | |
3 # | |
4 # Licensed under the Apache License, Version 2.0 (the "License"); | |
5 # you may not use this file except in compliance with the License. | |
6 # You may obtain a copy of the License at | |
7 # | |
8 # http://www.apache.org/licenses/LICENSE-2.0 | |
9 # | |
10 # Unless required by applicable law or agreed to in writing, software | |
11 # distributed under the License is distributed on an "AS IS" BASIS, | |
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 # See the License for the specific language governing permissions and | |
14 # limitations under the License. | |
15 """Unit and integration tests for gsutil command_runner module.""" | |
16 | |
17 from __future__ import absolute_import | |
18 | |
19 import logging | |
20 import os | |
21 import time | |
22 | |
23 import gslib | |
24 from gslib import command_runner | |
25 from gslib.command import Command | |
26 from gslib.command_argument import CommandArgument | |
27 from gslib.command_runner import CommandRunner | |
28 from gslib.command_runner import HandleArgCoding | |
29 from gslib.exception import CommandException | |
30 from gslib.tab_complete import CloudObjectCompleter | |
31 from gslib.tab_complete import CloudOrLocalObjectCompleter | |
32 from gslib.tab_complete import LocalObjectCompleter | |
33 from gslib.tab_complete import LocalObjectOrCannedACLCompleter | |
34 from gslib.tab_complete import NoOpCompleter | |
35 import gslib.tests.testcase as testcase | |
36 import gslib.tests.util as util | |
37 from gslib.tests.util import ARGCOMPLETE_AVAILABLE | |
38 from gslib.tests.util import SetBotoConfigFileForTest | |
39 from gslib.tests.util import SetBotoConfigForTest | |
40 from gslib.tests.util import unittest | |
41 from gslib.util import GSUTIL_PUB_TARBALL | |
42 from gslib.util import SECONDS_PER_DAY | |
43 | |
44 | |
45 class FakeArgparseArgument(object): | |
46 """Fake for argparse parser argument.""" | |
47 pass | |
48 | |
49 | |
50 class FakeArgparseParser(object): | |
51 """Fake for argparse parser.""" | |
52 | |
53 def __init__(self): | |
54 self.arguments = [] | |
55 | |
56 def add_argument(self, *unused_args, **unused_kwargs): | |
57 argument = FakeArgparseArgument() | |
58 self.arguments.append(argument) | |
59 return argument | |
60 | |
61 | |
62 class FakeArgparseSubparsers(object): | |
63 """Container for nested parsers.""" | |
64 | |
65 def __init__(self): | |
66 self.parsers = [] | |
67 | |
68 def add_parser(self, unused_name, **unused_kwargs): | |
69 parser = FakeArgparseParser() | |
70 self.parsers.append(parser) | |
71 return parser | |
72 | |
73 | |
74 class FakeCommandWithInvalidCompleter(Command): | |
75 """Command with an invalid completer on an argument.""" | |
76 | |
77 command_spec = Command.CreateCommandSpec( | |
78 'fake1', | |
79 argparse_arguments=[ | |
80 CommandArgument('arg', completer='BAD') | |
81 ] | |
82 ) | |
83 | |
84 help_spec = Command.HelpSpec( | |
85 help_name='fake1', | |
86 help_name_aliases=[], | |
87 help_type='command_help', | |
88 help_one_line_summary='fake command for tests', | |
89 help_text='fake command for tests', | |
90 subcommand_help_text={} | |
91 ) | |
92 | |
93 def __init__(self): | |
94 pass | |
95 | |
96 | |
97 class FakeCommandWithCompleters(Command): | |
98 """Command with various completer types.""" | |
99 | |
100 command_spec = Command.CreateCommandSpec( | |
101 'fake2', | |
102 argparse_arguments=[ | |
103 CommandArgument.MakeZeroOrMoreCloudURLsArgument(), | |
104 CommandArgument.MakeZeroOrMoreFileURLsArgument(), | |
105 CommandArgument.MakeZeroOrMoreCloudOrFileURLsArgument(), | |
106 CommandArgument.MakeFreeTextArgument(), | |
107 CommandArgument.MakeZeroOrMoreCloudBucketURLsArgument(), | |
108 CommandArgument.MakeFileURLOrCannedACLArgument(), | |
109 ] | |
110 ) | |
111 | |
112 help_spec = Command.HelpSpec( | |
113 help_name='fake2', | |
114 help_name_aliases=[], | |
115 help_type='command_help', | |
116 help_one_line_summary='fake command for tests', | |
117 help_text='fake command for tests', | |
118 subcommand_help_text={} | |
119 ) | |
120 | |
121 def __init__(self): | |
122 pass | |
123 | |
124 | |
125 class TestCommandRunnerUnitTests( | |
126 testcase.unit_testcase.GsUtilUnitTestCase): | |
127 """Unit tests for gsutil update check in command_runner module.""" | |
128 | |
129 def setUp(self): | |
130 """Sets up the command runner mock objects.""" | |
131 super(TestCommandRunnerUnitTests, self).setUp() | |
132 | |
133 # Mock out the timestamp file so we can manipulate it. | |
134 self.previous_update_file = ( | |
135 command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE) | |
136 self.timestamp_file = self.CreateTempFile() | |
137 command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = ( | |
138 self.timestamp_file) | |
139 | |
140 # Mock out the gsutil version checker. | |
141 base_version = unicode(gslib.VERSION) | |
142 while not base_version.isnumeric(): | |
143 if not base_version: | |
144 raise CommandException( | |
145 'Version number (%s) is not numeric.' % gslib.VERSION) | |
146 base_version = base_version[:-1] | |
147 command_runner.LookUpGsutilVersion = lambda u, v: float(base_version) + 1 | |
148 | |
149 # Mock out raw_input to trigger yes prompt. | |
150 command_runner.raw_input = lambda p: 'y' | |
151 | |
152 # Mock out TTY check to pretend we're on a TTY even if we're not. | |
153 self.running_interactively = True | |
154 command_runner.IsRunningInteractively = lambda: self.running_interactively | |
155 | |
156 # Mock out the modified time of the VERSION file. | |
157 self.version_mod_time = 0 | |
158 self.previous_version_mod_time = command_runner.GetGsutilVersionModifiedTime | |
159 command_runner.GetGsutilVersionModifiedTime = lambda: self.version_mod_time | |
160 | |
161 # Create a fake pub tarball that will be used to check for gsutil version. | |
162 self.pub_bucket_uri = self.CreateBucket('pub') | |
163 self.gsutil_tarball_uri = self.CreateObject( | |
164 bucket_uri=self.pub_bucket_uri, object_name='gsutil.tar.gz', | |
165 contents='foo') | |
166 | |
167 def tearDown(self): | |
168 """Tears down the command runner mock objects.""" | |
169 super(TestCommandRunnerUnitTests, self).tearDown() | |
170 | |
171 command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = ( | |
172 self.previous_update_file) | |
173 command_runner.LookUpGsutilVersion = gslib.util.LookUpGsutilVersion | |
174 command_runner.raw_input = raw_input | |
175 | |
176 command_runner.GetGsutilVersionModifiedTime = self.previous_version_mod_time | |
177 | |
178 command_runner.IsRunningInteractively = gslib.util.IsRunningInteractively | |
179 | |
180 self.gsutil_tarball_uri.delete_key() | |
181 self.pub_bucket_uri.delete_bucket() | |
182 | |
183 def _IsPackageOrCloudSDKInstall(self): | |
184 # Update should not trigger for package installs or Cloud SDK installs. | |
185 return (gslib.IS_PACKAGE_INSTALL or | |
186 os.environ.get('CLOUDSDK_WRAPPER') == '1') | |
187 | |
188 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') | |
189 def test_not_interactive(self): | |
190 """Tests that update is not triggered if not running interactively.""" | |
191 with SetBotoConfigForTest([ | |
192 ('GSUtil', 'software_update_check_period', '1')]): | |
193 with open(self.timestamp_file, 'w') as f: | |
194 f.write(str(int(time.time() - 2 * SECONDS_PER_DAY))) | |
195 self.running_interactively = False | |
196 self.assertEqual( | |
197 False, | |
198 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) | |
199 | |
200 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') | |
201 def test_no_tracker_file_version_recent(self): | |
202 """Tests when no timestamp file exists and VERSION file is recent.""" | |
203 if os.path.exists(self.timestamp_file): | |
204 os.remove(self.timestamp_file) | |
205 self.assertFalse(os.path.exists(self.timestamp_file)) | |
206 self.version_mod_time = time.time() | |
207 self.assertEqual( | |
208 False, | |
209 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) | |
210 | |
211 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') | |
212 def test_no_tracker_file_version_old(self): | |
213 """Tests when no timestamp file exists and VERSION file is old.""" | |
214 if os.path.exists(self.timestamp_file): | |
215 os.remove(self.timestamp_file) | |
216 self.assertFalse(os.path.exists(self.timestamp_file)) | |
217 self.version_mod_time = 0 | |
218 expected = not self._IsPackageOrCloudSDKInstall() | |
219 self.assertEqual( | |
220 expected, | |
221 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) | |
222 | |
223 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') | |
224 def test_invalid_commands(self): | |
225 """Tests that update is not triggered for certain commands.""" | |
226 self.assertEqual( | |
227 False, | |
228 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('update', 0)) | |
229 | |
230 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') | |
231 def test_invalid_file_contents(self): | |
232 """Tests no update if timestamp file has invalid value.""" | |
233 with open(self.timestamp_file, 'w') as f: | |
234 f.write('NaN') | |
235 self.assertEqual( | |
236 False, | |
237 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) | |
238 | |
239 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') | |
240 def test_update_should_trigger(self): | |
241 """Tests update should be triggered if time is up.""" | |
242 with SetBotoConfigForTest([ | |
243 ('GSUtil', 'software_update_check_period', '1')]): | |
244 with open(self.timestamp_file, 'w') as f: | |
245 f.write(str(int(time.time() - 2 * SECONDS_PER_DAY))) | |
246 expected = not self._IsPackageOrCloudSDKInstall() | |
247 self.assertEqual( | |
248 expected, | |
249 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) | |
250 | |
251 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') | |
252 def test_not_time_for_update_yet(self): | |
253 """Tests update not triggered if not time yet.""" | |
254 with SetBotoConfigForTest([ | |
255 ('GSUtil', 'software_update_check_period', '3')]): | |
256 with open(self.timestamp_file, 'w') as f: | |
257 f.write(str(int(time.time() - 2 * SECONDS_PER_DAY))) | |
258 self.assertEqual( | |
259 False, | |
260 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) | |
261 | |
262 def test_user_says_no_to_update(self): | |
263 """Tests no update triggered if user says no at the prompt.""" | |
264 with SetBotoConfigForTest([ | |
265 ('GSUtil', 'software_update_check_period', '1')]): | |
266 with open(self.timestamp_file, 'w') as f: | |
267 f.write(str(int(time.time() - 2 * SECONDS_PER_DAY))) | |
268 command_runner.raw_input = lambda p: 'n' | |
269 self.assertEqual( | |
270 False, | |
271 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) | |
272 | |
273 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') | |
274 def test_update_check_skipped_with_quiet_mode(self): | |
275 """Tests that update isn't triggered when loglevel is in quiet mode.""" | |
276 with SetBotoConfigForTest([ | |
277 ('GSUtil', 'software_update_check_period', '1')]): | |
278 with open(self.timestamp_file, 'w') as f: | |
279 f.write(str(int(time.time() - 2 * SECONDS_PER_DAY))) | |
280 | |
281 expected = not self._IsPackageOrCloudSDKInstall() | |
282 self.assertEqual( | |
283 expected, | |
284 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) | |
285 | |
286 prev_loglevel = logging.getLogger().getEffectiveLevel() | |
287 try: | |
288 logging.getLogger().setLevel(logging.ERROR) | |
289 # With reduced loglevel, should return False. | |
290 self.assertEqual( | |
291 False, | |
292 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) | |
293 finally: | |
294 logging.getLogger().setLevel(prev_loglevel) | |
295 | |
296 def test_command_argument_parser_setup_invalid_completer(self): | |
297 | |
298 command_map = { | |
299 FakeCommandWithInvalidCompleter.command_spec.command_name: | |
300 FakeCommandWithInvalidCompleter() | |
301 } | |
302 | |
303 runner = CommandRunner( | |
304 bucket_storage_uri_class=self.mock_bucket_storage_uri, | |
305 gsutil_api_class_map_factory=self.mock_gsutil_api_class_map_factory, | |
306 command_map=command_map) | |
307 | |
308 subparsers = FakeArgparseSubparsers() | |
309 try: | |
310 runner.ConfigureCommandArgumentParsers(subparsers) | |
311 except RuntimeError as e: | |
312 self.assertIn('Unknown completer', e.message) | |
313 | |
314 @unittest.skipUnless(ARGCOMPLETE_AVAILABLE, | |
315 'Tab completion requires argcomplete') | |
316 def test_command_argument_parser_setup_completers(self): | |
317 | |
318 command_map = { | |
319 FakeCommandWithCompleters.command_spec.command_name: | |
320 FakeCommandWithCompleters() | |
321 } | |
322 | |
323 runner = CommandRunner( | |
324 bucket_storage_uri_class=self.mock_bucket_storage_uri, | |
325 gsutil_api_class_map_factory=self.mock_gsutil_api_class_map_factory, | |
326 command_map=command_map) | |
327 | |
328 subparsers = FakeArgparseSubparsers() | |
329 runner.ConfigureCommandArgumentParsers(subparsers) | |
330 | |
331 self.assertEqual(1, len(subparsers.parsers)) | |
332 parser = subparsers.parsers[0] | |
333 self.assertEqual(6, len(parser.arguments)) | |
334 self.assertEqual(CloudObjectCompleter, type(parser.arguments[0].completer)) | |
335 self.assertEqual(LocalObjectCompleter, type(parser.arguments[1].completer)) | |
336 self.assertEqual( | |
337 CloudOrLocalObjectCompleter, type(parser.arguments[2].completer)) | |
338 self.assertEqual( | |
339 NoOpCompleter, type(parser.arguments[3].completer)) | |
340 self.assertEqual(CloudObjectCompleter, type(parser.arguments[4].completer)) | |
341 self.assertEqual( | |
342 LocalObjectOrCannedACLCompleter, type(parser.arguments[5].completer)) | |
343 | |
344 # pylint: disable=invalid-encoded-data | |
345 def test_valid_arg_coding(self): | |
346 """Tests that gsutil encodes valid args correctly.""" | |
347 # Args other than -h and -p should be utf-8 decoded. | |
348 args = HandleArgCoding(['ls', '-l']) | |
349 self.assertIs(type(args[0]), unicode) | |
350 self.assertIs(type(args[1]), unicode) | |
351 | |
352 # -p and -h args other than x-goog-meta should not be decoded. | |
353 args = HandleArgCoding(['ls', '-p', 'abc:def', 'gs://bucket']) | |
354 self.assertIs(type(args[0]), unicode) | |
355 self.assertIs(type(args[1]), unicode) | |
356 self.assertIsNot(type(args[2]), unicode) | |
357 self.assertIs(type(args[3]), unicode) | |
358 | |
359 args = HandleArgCoding(['gsutil', '-h', 'content-type:text/plain', 'cp', | |
360 'a', 'gs://bucket']) | |
361 self.assertIs(type(args[0]), unicode) | |
362 self.assertIs(type(args[1]), unicode) | |
363 self.assertIsNot(type(args[2]), unicode) | |
364 self.assertIs(type(args[3]), unicode) | |
365 self.assertIs(type(args[4]), unicode) | |
366 self.assertIs(type(args[5]), unicode) | |
367 | |
368 # -h x-goog-meta args should be decoded. | |
369 args = HandleArgCoding(['gsutil', '-h', 'x-goog-meta-abc', '1234']) | |
370 self.assertIs(type(args[0]), unicode) | |
371 self.assertIs(type(args[1]), unicode) | |
372 self.assertIs(type(args[2]), unicode) | |
373 self.assertIs(type(args[3]), unicode) | |
374 | |
375 # -p and -h args with non-ASCII content should raise CommandException. | |
376 try: | |
377 HandleArgCoding(['ls', '-p', '碼']) | |
378 # Ensure exception is raised. | |
379 self.assertTrue(False) | |
380 except CommandException as e: | |
381 self.assertIn('Invalid non-ASCII header', e.reason) | |
382 try: | |
383 HandleArgCoding(['-h', '碼', 'ls']) | |
384 # Ensure exception is raised. | |
385 self.assertTrue(False) | |
386 except CommandException as e: | |
387 self.assertIn('Invalid non-ASCII header', e.reason) | |
388 | |
389 | |
390 class TestCommandRunnerIntegrationTests( | |
391 testcase.GsUtilIntegrationTestCase): | |
392 """Integration tests for gsutil update check in command_runner module.""" | |
393 | |
394 def setUp(self): | |
395 """Sets up the command runner mock objects.""" | |
396 super(TestCommandRunnerIntegrationTests, self).setUp() | |
397 | |
398 # Mock out the timestamp file so we can manipulate it. | |
399 self.previous_update_file = ( | |
400 command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE) | |
401 self.timestamp_file = self.CreateTempFile(contents='0') | |
402 command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = ( | |
403 self.timestamp_file) | |
404 | |
405 # Mock out raw_input to trigger yes prompt. | |
406 command_runner.raw_input = lambda p: 'y' | |
407 | |
408 def tearDown(self): | |
409 """Tears down the command runner mock objects.""" | |
410 super(TestCommandRunnerIntegrationTests, self).tearDown() | |
411 command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = ( | |
412 self.previous_update_file) | |
413 command_runner.raw_input = raw_input | |
414 | |
415 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') | |
416 def test_lookup_version_without_credentials(self): | |
417 """Tests that gsutil tarball version lookup works without credentials.""" | |
418 with SetBotoConfigFileForTest(self.CreateTempFile( | |
419 contents='[GSUtil]\nsoftware_update_check_period=1')): | |
420 self.command_runner = command_runner.CommandRunner() | |
421 # Looking up software version shouldn't get auth failure exception. | |
422 self.command_runner.RunNamedCommand('ls', [GSUTIL_PUB_TARBALL]) | |
OLD | NEW |