| Index: gslib/tests/test_defacl.py
|
| ===================================================================
|
| --- gslib/tests/test_defacl.py (revision 33376)
|
| +++ gslib/tests/test_defacl.py (working copy)
|
| @@ -1,3 +1,4 @@
|
| +# -*- coding: utf-8 -*-
|
| # Copyright 2013 Google Inc. All Rights Reserved.
|
| #
|
| # Licensed under the Apache License, Version 2.0 (the "License");
|
| @@ -11,110 +12,139 @@
|
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| # See the License for the specific language governing permissions and
|
| # limitations under the License.
|
| +"""Integration tests for the defacl command."""
|
|
|
| +from __future__ import absolute_import
|
| +
|
| +import re
|
| import gslib.tests.testcase as case
|
| +from gslib.tests.testcase.integration_testcase import SkipForS3
|
| from gslib.tests.util import ObjectToURI as suri
|
|
|
| +PUBLIC_READ_JSON_ACL_TEXT = '"entity":"allUsers","role":"READER"'
|
|
|
| +
|
| +@SkipForS3('S3 does not support default object ACLs.')
|
| class TestDefacl(case.GsUtilIntegrationTestCase):
|
| - """Tests gslib.commands.defacl."""
|
| + """Integration tests for the defacl command."""
|
|
|
| _defacl_ch_prefix = ['defacl', 'ch']
|
| _defacl_get_prefix = ['defacl', 'get']
|
| _defacl_set_prefix = ['defacl', 'set']
|
|
|
| - def _MakeScopeRegex(self, scope_type, email_address, perm):
|
| - template_regex = (
|
| - r'<Scope type="{0}">\s*<EmailAddress>\s*{1}\s*</EmailAddress>\s*'
|
| - r'</Scope>\s*<Permission>\s*{2}\s*</Permission>')
|
| - return template_regex.format(scope_type, email_address, perm)
|
| + def _MakeScopeRegex(self, role, entity_type, email_address):
|
| + template_regex = (r'\{.*"entity":\s*"%s-%s".*"role":\s*"%s".*\}' %
|
| + (entity_type, email_address, role))
|
| + return re.compile(template_regex, flags=re.DOTALL)
|
|
|
| def testChangeDefaultAcl(self):
|
| + """Tests defacl ch."""
|
| bucket = self.CreateBucket()
|
|
|
| test_regex = self._MakeScopeRegex(
|
| - 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ')
|
| - xml = self.RunGsUtil(self._defacl_get_prefix +
|
| - [suri(bucket)], return_stdout=True)
|
| - self.assertNotRegexpMatches(xml, test_regex)
|
| + 'OWNER', 'group', self.GROUP_TEST_ADDRESS)
|
| + test_regex2 = self._MakeScopeRegex(
|
| + 'READER', 'group', self.GROUP_TEST_ADDRESS)
|
| + json_text = self.RunGsUtil(self._defacl_get_prefix +
|
| + [suri(bucket)], return_stdout=True)
|
| + self.assertNotRegexpMatches(json_text, test_regex)
|
|
|
| self.RunGsUtil(self._defacl_ch_prefix +
|
| - ['-g', self.GROUP_TEST_ADDRESS+':READ', suri(bucket)])
|
| - xml = self.RunGsUtil(self._defacl_get_prefix +
|
| - [suri(bucket)], return_stdout=True)
|
| - self.assertRegexpMatches(xml, test_regex)
|
| + ['-g', self.GROUP_TEST_ADDRESS+':FC', suri(bucket)])
|
| + json_text2 = self.RunGsUtil(self._defacl_get_prefix +
|
| + [suri(bucket)], return_stdout=True)
|
| + self.assertRegexpMatches(json_text2, test_regex)
|
|
|
| + self.RunGsUtil(self._defacl_ch_prefix +
|
| + ['-g', self.GROUP_TEST_ADDRESS+':READ', suri(bucket)])
|
| + json_text3 = self.RunGsUtil(self._defacl_get_prefix +
|
| + [suri(bucket)], return_stdout=True)
|
| + self.assertRegexpMatches(json_text3, test_regex2)
|
| +
|
| + stderr = self.RunGsUtil(self._defacl_ch_prefix +
|
| + ['-g', self.GROUP_TEST_ADDRESS+':WRITE',
|
| + suri(bucket)],
|
| + return_stderr=True, expected_status=1)
|
| + self.assertIn('WRITER cannot be set as a default object ACL', stderr)
|
| +
|
| def testChangeMultipleBuckets(self):
|
| + """Tests defacl ch on multiple buckets."""
|
| bucket1 = self.CreateBucket()
|
| bucket2 = self.CreateBucket()
|
|
|
| test_regex = self._MakeScopeRegex(
|
| - 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ')
|
| - xml = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket1)],
|
| - return_stdout=True)
|
| - self.assertNotRegexpMatches(xml, test_regex)
|
| - xml = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket2)],
|
| - return_stdout=True)
|
| - self.assertNotRegexpMatches(xml, test_regex)
|
| + 'READER', 'group', self.GROUP_TEST_ADDRESS)
|
| + json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket1)],
|
| + return_stdout=True)
|
| + self.assertNotRegexpMatches(json_text, test_regex)
|
| + json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket2)],
|
| + return_stdout=True)
|
| + self.assertNotRegexpMatches(json_text, test_regex)
|
|
|
| self.RunGsUtil(self._defacl_ch_prefix +
|
| ['-g', self.GROUP_TEST_ADDRESS+':READ',
|
| suri(bucket1), suri(bucket2)])
|
| - xml = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket1)],
|
| - return_stdout=True)
|
| - self.assertRegexpMatches(xml, test_regex)
|
| - xml = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket2)],
|
| - return_stdout=True)
|
| - self.assertRegexpMatches(xml, test_regex)
|
| + json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket1)],
|
| + return_stdout=True)
|
| + self.assertRegexpMatches(json_text, test_regex)
|
| + json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket2)],
|
| + return_stdout=True)
|
| + self.assertRegexpMatches(json_text, test_regex)
|
|
|
| def testChangeMultipleAcls(self):
|
| + """Tests defacl ch with multiple ACL entries."""
|
| bucket = self.CreateBucket()
|
|
|
| test_regex_group = self._MakeScopeRegex(
|
| - 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ')
|
| + 'READER', 'group', self.GROUP_TEST_ADDRESS)
|
| test_regex_user = self._MakeScopeRegex(
|
| - 'UserByEmail', self.USER_TEST_ADDRESS, 'FULL_CONTROL')
|
| - xml = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)],
|
| - return_stdout=True)
|
| - self.assertNotRegexpMatches(xml, test_regex_group)
|
| - self.assertNotRegexpMatches(xml, test_regex_user)
|
| + 'OWNER', 'user', self.USER_TEST_ADDRESS)
|
| + json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)],
|
| + return_stdout=True)
|
| + self.assertNotRegexpMatches(json_text, test_regex_group)
|
| + self.assertNotRegexpMatches(json_text, test_regex_user)
|
|
|
| self.RunGsUtil(self._defacl_ch_prefix +
|
| ['-g', self.GROUP_TEST_ADDRESS+':READ',
|
| '-u', self.USER_TEST_ADDRESS+':fc', suri(bucket)])
|
| - xml = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)],
|
| - return_stdout=True)
|
| - self.assertRegexpMatches(xml, test_regex_group)
|
| - self.assertRegexpMatches(xml, test_regex_user)
|
| + json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)],
|
| + return_stdout=True)
|
| + self.assertRegexpMatches(json_text, test_regex_group)
|
| + self.assertRegexpMatches(json_text, test_regex_user)
|
|
|
| def testEmptyDefAcl(self):
|
| bucket = self.CreateBucket()
|
| self.RunGsUtil(self._defacl_set_prefix + ['private', suri(bucket)])
|
| + stdout = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)],
|
| + return_stdout=True)
|
| + self.assertEquals(stdout.rstrip(), '[]')
|
| self.RunGsUtil(self._defacl_ch_prefix +
|
| ['-u', self.USER_TEST_ADDRESS+':fc', suri(bucket)])
|
|
|
| def testDeletePermissionsWithCh(self):
|
| + """Tests removing permissions with defacl ch."""
|
| bucket = self.CreateBucket()
|
|
|
| test_regex = self._MakeScopeRegex(
|
| - 'UserByEmail', self.USER_TEST_ADDRESS, 'FULL_CONTROL')
|
| - xml = self.RunGsUtil(
|
| + 'OWNER', 'user', self.USER_TEST_ADDRESS)
|
| + json_text = self.RunGsUtil(
|
| self._defacl_get_prefix + [suri(bucket)], return_stdout=True)
|
| - self.assertNotRegexpMatches(xml, test_regex)
|
| + self.assertNotRegexpMatches(json_text, test_regex)
|
|
|
| self.RunGsUtil(self._defacl_ch_prefix +
|
| ['-u', self.USER_TEST_ADDRESS+':fc', suri(bucket)])
|
| - xml = self.RunGsUtil(
|
| + json_text = self.RunGsUtil(
|
| self._defacl_get_prefix + [suri(bucket)], return_stdout=True)
|
| - self.assertRegexpMatches(xml, test_regex)
|
| + self.assertRegexpMatches(json_text, test_regex)
|
|
|
| self.RunGsUtil(self._defacl_ch_prefix +
|
| ['-d', self.USER_TEST_ADDRESS, suri(bucket)])
|
| - xml = self.RunGsUtil(
|
| + json_text = self.RunGsUtil(
|
| self._defacl_get_prefix + [suri(bucket)], return_stdout=True)
|
| - self.assertNotRegexpMatches(xml, test_regex)
|
| + self.assertNotRegexpMatches(json_text, test_regex)
|
|
|
| def testTooFewArgumentsFails(self):
|
| + """Tests calling defacl with insufficient number of arguments."""
|
| # No arguments for get, but valid subcommand.
|
| stderr = self.RunGsUtil(self._defacl_get_prefix, return_stderr=True,
|
| expected_status=1)
|
| @@ -134,7 +164,8 @@
|
| stderr = self.RunGsUtil(['defacl'], return_stderr=True, expected_status=1)
|
| self.assertIn('command requires at least', stderr)
|
|
|
| +
|
| class TestDefaclOldAlias(TestDefacl):
|
| - _defacl_ch_prefix= ['chdefacl']
|
| + _defacl_ch_prefix = ['chdefacl']
|
| _defacl_get_prefix = ['getdefacl']
|
| _defacl_set_prefix = ['setdefacl']
|
|
|