| Index: gslib/tests/test_ls.py
|
| ===================================================================
|
| --- gslib/tests/test_ls.py (revision 33376)
|
| +++ gslib/tests/test_ls.py (working copy)
|
| @@ -1,5 +1,4 @@
|
| # -*- coding: utf-8 -*-
|
| -#
|
| # Copyright 2013 Google Inc. All Rights Reserved.
|
| #
|
| # Licensed under the Apache License, Version 2.0 (the "License");
|
| @@ -13,16 +12,24 @@
|
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| # See the License for the specific language governing permissions and
|
| # limitations under the License.
|
| +"""Tests for ls command."""
|
|
|
| +from __future__ import absolute_import
|
| +
|
| import posixpath
|
| import re
|
| import subprocess
|
| import sys
|
|
|
| import gslib
|
| +from gslib.cs_api_map import ApiSelector
|
| import gslib.tests.testcase as testcase
|
| +from gslib.tests.testcase.integration_testcase import SkipForS3
|
| +from gslib.tests.util import ObjectToURI as suri
|
| +from gslib.tests.util import unittest
|
| +from gslib.util import IS_WINDOWS
|
| from gslib.util import Retry
|
| -from gslib.tests.util import ObjectToURI as suri
|
| +from gslib.util import UTF8
|
|
|
|
|
| class TestLs(testcase.GsUtilIntegrationTestCase):
|
| @@ -33,12 +40,7 @@
|
|
|
| def test_empty_bucket(self):
|
| bucket_uri = self.CreateBucket()
|
| - # Use @Retry as hedge against bucket listing eventual consistency.
|
| - @Retry(AssertionError, tries=3, timeout_secs=1)
|
| - def _Check1():
|
| - stdout = self.RunGsUtil(['ls', suri(bucket_uri)], return_stdout=True)
|
| - self.assertEqual('', stdout)
|
| - _Check1()
|
| + self.AssertNObjectsInBucket(bucket_uri, 0)
|
|
|
| def test_empty_bucket_with_b(self):
|
| bucket_uri = self.CreateBucket()
|
| @@ -51,6 +53,7 @@
|
| _Check1()
|
|
|
| def test_bucket_with_Lb(self):
|
| + """Tests ls -Lb."""
|
| bucket_uri = self.CreateBucket()
|
| # Use @Retry as hedge against bucket listing eventual consistency.
|
| @Retry(AssertionError, tries=3, timeout_secs=1)
|
| @@ -62,6 +65,7 @@
|
| _Check1()
|
|
|
| def test_bucket_with_lb(self):
|
| + """Tests ls -lb."""
|
| bucket_uri = self.CreateBucket()
|
| # Use @Retry as hedge against bucket listing eventual consistency.
|
| @Retry(AssertionError, tries=3, timeout_secs=1)
|
| @@ -73,6 +77,7 @@
|
| _Check1()
|
|
|
| def test_bucket_list_wildcard(self):
|
| + """Tests listing multiple buckets with a wildcard."""
|
| random_prefix = self.MakeRandomTestString()
|
| bucket1_name = self.MakeTempName('bucket', prefix=random_prefix)
|
| bucket2_name = self.MakeTempName('bucket', prefix=random_prefix)
|
| @@ -85,7 +90,8 @@
|
| common_prefix = posixpath.commonprefix([suri(bucket1_uri),
|
| suri(bucket2_uri)])
|
| self.assertTrue(common_prefix.startswith(
|
| - 'gs://%sgsutil-test-test_bucket_list_wildcard-bucket-' % random_prefix))
|
| + '%s://%sgsutil-test-test_bucket_list_wildcard-bucket-' %
|
| + (self.default_provider, random_prefix)))
|
| wildcard = '%s*' % common_prefix
|
|
|
| # Use @Retry as hedge against bucket listing eventual consistency.
|
| @@ -98,33 +104,41 @@
|
| _Check1()
|
|
|
| def test_nonexistent_bucket_with_ls(self):
|
| + """Tests a bucket that is known not to exist."""
|
| stderr = self.RunGsUtil(
|
| - ['ls', '-lb', 'gs://%s' % self.NONEXISTENT_BUCKET_NAME],
|
| + ['ls', '-lb', 'gs://%s' % self.nonexistent_bucket_name],
|
| return_stderr=True, expected_status=1)
|
| self.assertIn('404', stderr)
|
|
|
| stderr = self.RunGsUtil(
|
| - ['ls', '-Lb', 'gs://%s' % self.NONEXISTENT_BUCKET_NAME],
|
| + ['ls', '-Lb', 'gs://%s' % self.nonexistent_bucket_name],
|
| return_stderr=True, expected_status=1)
|
| self.assertIn('404', stderr)
|
|
|
| stderr = self.RunGsUtil(
|
| - ['ls', '-b', 'gs://%s' % self.NONEXISTENT_BUCKET_NAME],
|
| + ['ls', '-b', 'gs://%s' % self.nonexistent_bucket_name],
|
| return_stderr=True, expected_status=1)
|
| self.assertIn('404', stderr)
|
|
|
| + def test_list_missing_object(self):
|
| + """Tests listing a non-existent object."""
|
| + bucket_uri = self.CreateBucket()
|
| + stderr = self.RunGsUtil(['ls', suri(bucket_uri, 'missing')],
|
| + return_stderr=True, expected_status=1)
|
| + self.assertIn('matched no objects', stderr)
|
| +
|
| def test_with_one_object(self):
|
| - bucket_uri = self.CreateBucket(test_objects=1)
|
| - objuri = [suri(bucket_uri.clone_replace_name(key.name))
|
| - for key in bucket_uri.list_bucket()][0]
|
| + bucket_uri = self.CreateBucket()
|
| + obj_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo')
|
| # Use @Retry as hedge against bucket listing eventual consistency.
|
| @Retry(AssertionError, tries=3, timeout_secs=1)
|
| def _Check1():
|
| stdout = self.RunGsUtil(['ls', suri(bucket_uri)], return_stdout=True)
|
| - self.assertEqual('%s\n' % objuri, stdout)
|
| + self.assertEqual('%s\n' % obj_uri, stdout)
|
| _Check1()
|
|
|
| def test_subdir(self):
|
| + """Tests listing a bucket subdirectory."""
|
| bucket_uri = self.CreateBucket(test_objects=1)
|
| k1_uri = bucket_uri.clone_replace_name('foo')
|
| k1_uri.set_contents_from_string('baz')
|
| @@ -141,16 +155,19 @@
|
| _Check1()
|
|
|
| def test_versioning(self):
|
| + """Tests listing a versioned bucket."""
|
| bucket1_uri = self.CreateBucket(test_objects=1)
|
| bucket2_uri = self.CreateVersionedBucket(test_objects=1)
|
| + self.AssertNObjectsInBucket(bucket1_uri, 1, versioned=True)
|
| bucket_list = list(bucket1_uri.list_bucket())
|
| +
|
| objuri = [bucket1_uri.clone_replace_key(key).versionless_uri
|
| for key in bucket_list][0]
|
| self.RunGsUtil(['cp', objuri, suri(bucket2_uri)])
|
| self.RunGsUtil(['cp', objuri, suri(bucket2_uri)])
|
| # Use @Retry as hedge against bucket listing eventual consistency.
|
| @Retry(AssertionError, tries=3, timeout_secs=1)
|
| - def _Check1():
|
| + def _Check2():
|
| stdout = self.RunGsUtil(['ls', '-a', suri(bucket2_uri)],
|
| return_stdout=True)
|
| self.assertNumLines(stdout, 3)
|
| @@ -159,30 +176,46 @@
|
| self.assertIn('%s#' % bucket2_uri.clone_replace_name(bucket_list[0].name),
|
| stdout)
|
| self.assertIn('metageneration=', stdout)
|
| - _Check1()
|
| + _Check2()
|
|
|
| def test_etag(self):
|
| + """Tests that listing an object with an etag."""
|
| bucket_uri = self.CreateBucket()
|
| obj_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo')
|
| - etag = obj_uri.get_key().etag
|
| + # TODO: When testcase setup can use JSON, match against the exact JSON
|
| + # etag.
|
| + etag = obj_uri.get_key().etag.strip('"\'')
|
| # Use @Retry as hedge against bucket listing eventual consistency.
|
| @Retry(AssertionError, tries=3, timeout_secs=1)
|
| def _Check1():
|
| stdout = self.RunGsUtil(['ls', '-l', suri(bucket_uri)],
|
| return_stdout=True)
|
| - self.assertNotIn(etag, stdout)
|
| + if self.test_api == ApiSelector.XML:
|
| + self.assertNotIn(etag, stdout)
|
| + else:
|
| + self.assertNotIn('etag=', stdout)
|
| + _Check1()
|
|
|
| + def _Check2():
|
| stdout = self.RunGsUtil(['ls', '-le', suri(bucket_uri)],
|
| return_stdout=True)
|
| - self.assertIn(etag, stdout)
|
| + if self.test_api == ApiSelector.XML:
|
| + self.assertIn(etag, stdout)
|
| + else:
|
| + self.assertIn('etag=', stdout)
|
| + _Check2()
|
|
|
| + def _Check3():
|
| stdout = self.RunGsUtil(['ls', '-ale', suri(bucket_uri)],
|
| return_stdout=True)
|
| - self.assertIn(etag, stdout)
|
| + if self.test_api == ApiSelector.XML:
|
| + self.assertIn(etag, stdout)
|
| + else:
|
| + self.assertIn('etag=', stdout)
|
| + _Check3()
|
|
|
| - _Check1()
|
| -
|
| def test_list_sizes(self):
|
| + """Tests various size listing options."""
|
| bucket_uri = self.CreateBucket()
|
| self.CreateObject(bucket_uri=bucket_uri, contents='x' * 2048)
|
|
|
| @@ -226,22 +259,44 @@
|
| self.assertIn('2 KB', stdout)
|
| _Check5()
|
|
|
| + @unittest.skipIf(IS_WINDOWS,
|
| + 'Unicode handling on Windows requires mods to site-packages')
|
| def test_list_unicode_filename(self):
|
| + """Tests listing an object with a unicode filename."""
|
| + # Note: This test fails on Windows (command.exe). I was able to get ls to
|
| + # output Unicode filenames correctly by hacking the UniStream class code
|
| + # shown at
|
| + # http://stackoverflow.com/questions/878972/windows-cmd-encoding-change-causes-python-crash/3259271
|
| + # into the start of gslib/commands/ls.py, along with no-op flush and
|
| + # isastream functions (as an experiment). However, even with that change,
|
| + # the current test still fails, since it also needs to run that
|
| + # stdout/stderr-replacement code. That UniStream class replacement really
|
| + # needs to be added to the site-packages on Windows python.
|
| object_name = u'Аудиоархив'
|
| - object_name_bytes = object_name.encode('utf-8')
|
| + object_name_bytes = object_name.encode(UTF8)
|
| bucket_uri = self.CreateVersionedBucket()
|
| key_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo',
|
| object_name=object_name)
|
| + self.AssertNObjectsInBucket(bucket_uri, 1, versioned=True)
|
| stdout = self.RunGsUtil(['ls', '-ael', suri(key_uri)],
|
| return_stdout=True)
|
| self.assertIn(object_name_bytes, stdout)
|
| - self.assertIn(key_uri.generation, stdout)
|
| - self.assertIn(
|
| - 'metageneration=%s' % key_uri.get_key().metageneration, stdout)
|
| - self.assertIn(
|
| - 'etag=%s' % key_uri.get_key().etag, stdout)
|
| + if self.default_provider == 'gs':
|
| + self.assertIn(key_uri.generation, stdout)
|
| + self.assertIn(
|
| + 'metageneration=%s' % key_uri.get_key().metageneration, stdout)
|
| + if self.test_api == ApiSelector.XML:
|
| + self.assertIn(key_uri.get_key().etag.strip('"\''), stdout)
|
| + else:
|
| + # TODO: When testcase setup can use JSON, match against the exact JSON
|
| + # etag.
|
| + self.assertIn('etag=', stdout)
|
| + elif self.default_provider == 's3':
|
| + self.assertIn(key_uri.version_id, stdout)
|
| + self.assertIn(key_uri.get_key().etag.strip('"\''), stdout)
|
|
|
| def test_list_gzip_content_length(self):
|
| + """Tests listing a gzipped object."""
|
| file_size = 10000
|
| file_contents = 'x' * file_size
|
| fpath = self.CreateTempFile(contents=file_contents, file_name='foo.txt')
|
| @@ -262,6 +317,7 @@
|
| _Check1()
|
|
|
| def test_output_chopped(self):
|
| + """Tests that gsutil still succeeds with a truncated stdout."""
|
| bucket_uri = self.CreateBucket(test_objects=2)
|
|
|
| # Run Python with the -u flag so output is not buffered.
|
| @@ -274,3 +330,37 @@
|
| p.wait()
|
| # Make sure it still exited cleanly.
|
| self.assertEqual(p.returncode, 0)
|
| +
|
| + def test_recursive_list_trailing_slash(self):
|
| + """Tests listing an object with a trailing slash."""
|
| + bucket_uri = self.CreateBucket()
|
| + self.CreateObject(bucket_uri=bucket_uri, object_name='/', contents='foo')
|
| + self.AssertNObjectsInBucket(bucket_uri, 1)
|
| + stdout = self.RunGsUtil(['ls', '-R', suri(bucket_uri)], return_stdout=True)
|
| + # Note: The suri function normalizes the URI, so the double slash gets
|
| + # removed.
|
| + self.assertIn(suri(bucket_uri) + '/', stdout)
|
| +
|
| + def test_recursive_list_trailing_two_slash(self):
|
| + """Tests listing an object with two trailing slashes."""
|
| + bucket_uri = self.CreateBucket()
|
| + self.CreateObject(bucket_uri=bucket_uri, object_name='//', contents='foo')
|
| + self.AssertNObjectsInBucket(bucket_uri, 1)
|
| + stdout = self.RunGsUtil(['ls', '-R', suri(bucket_uri)], return_stdout=True)
|
| + # Note: The suri function normalizes the URI, so the double slash gets
|
| + # removed.
|
| + self.assertIn(suri(bucket_uri) + '//', stdout)
|
| +
|
| + @SkipForS3('S3 anonymous access is not supported.')
|
| + def test_get_object_without_list_bucket_permission(self):
|
| + # Bucket is not publicly readable by default.
|
| + bucket_uri = self.CreateBucket()
|
| + object_uri = self.CreateObject(bucket_uri=bucket_uri,
|
| + object_name='permitted', contents='foo')
|
| + # Set this object to be publicly readable.
|
| + self.RunGsUtil(['acl', 'set', 'public-read', suri(object_uri)])
|
| + # Drop credentials.
|
| + with self.SetAnonymousBotoCreds():
|
| + stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)],
|
| + return_stdout=True)
|
| + self.assertIn(suri(object_uri), stdout)
|
|
|