Index: third_party/gsutil/gslib/tests/test_acl.py |
diff --git a/third_party/gsutil/gslib/tests/test_acl.py b/third_party/gsutil/gslib/tests/test_acl.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..79c2c5b982e0ad1b07abbf71729d25b06ef14a4f |
--- /dev/null |
+++ b/third_party/gsutil/gslib/tests/test_acl.py |
@@ -0,0 +1,681 @@ |
+# -*- coding: utf-8 -*- |
+# Copyright 2013 Google Inc. All Rights Reserved. |
+# |
+# Licensed under the Apache License, Version 2.0 (the "License"); |
+# you may not use this file except in compliance with the License. |
+# You may obtain a copy of the License at |
+# |
+# http://www.apache.org/licenses/LICENSE-2.0 |
+# |
+# Unless required by applicable law or agreed to in writing, software |
+# distributed under the License is distributed on an "AS IS" BASIS, |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+# See the License for the specific language governing permissions and |
+# limitations under the License. |
+"""Integration tests for the acl command.""" |
+ |
+from __future__ import absolute_import |
+ |
+import re |
+ |
+from gslib import aclhelpers |
+from gslib.command import CreateGsutilLogger |
+from gslib.cs_api_map import ApiSelector |
+from gslib.project_id import PopulateProjectId |
+from gslib.storage_url import StorageUrlFromString |
+import gslib.tests.testcase as testcase |
+from gslib.tests.testcase.integration_testcase import SkipForGS |
+from gslib.tests.testcase.integration_testcase import SkipForS3 |
+from gslib.tests.util import ObjectToURI as suri |
+from gslib.translation_helper import AclTranslation |
+from gslib.util import Retry |
+ |
+PUBLIC_READ_JSON_ACL_TEXT = '"entity":"allUsers","role":"READER"' |
+ |
+ |
+class TestAclBase(testcase.GsUtilIntegrationTestCase): |
+ """Integration test case base class for acl command.""" |
+ |
+ _set_acl_prefix = ['acl', 'set'] |
+ _get_acl_prefix = ['acl', 'get'] |
+ _set_defacl_prefix = ['defacl', 'set'] |
+ _ch_acl_prefix = ['acl', 'ch'] |
+ |
+ _project_team = 'owners' |
+ _project_test_acl = '%s-%s' % (_project_team, PopulateProjectId()) |
+ |
+ |
+@SkipForS3('Tests use GS ACL model.') |
+class TestAcl(TestAclBase): |
+ """Integration tests for acl command.""" |
+ |
+ def setUp(self): |
+ super(TestAcl, self).setUp() |
+ self.sample_uri = self.CreateBucket() |
+ self.sample_url = StorageUrlFromString(str(self.sample_uri)) |
+ self.logger = CreateGsutilLogger('acl') |
+ |
+ def test_set_invalid_acl_object(self): |
+ """Ensures that invalid content returns a bad request error.""" |
+ obj_uri = suri(self.CreateObject(contents='foo')) |
+ inpath = self.CreateTempFile(contents='badAcl') |
+ stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri], |
+ return_stderr=True, expected_status=1) |
+ self.assertIn('ArgumentException', stderr) |
+ |
+ def test_set_invalid_acl_bucket(self): |
+ """Ensures that invalid content returns a bad request error.""" |
+ bucket_uri = suri(self.CreateBucket()) |
+ inpath = self.CreateTempFile(contents='badAcl') |
+ stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri], |
+ return_stderr=True, expected_status=1) |
+ self.assertIn('ArgumentException', stderr) |
+ |
+ def test_set_xml_acl_json_api_object(self): |
+ """Ensures XML content returns a bad request error and migration warning.""" |
+ obj_uri = suri(self.CreateObject(contents='foo')) |
+ inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>') |
+ stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri], |
+ return_stderr=True, expected_status=1) |
+ self.assertIn('ArgumentException', stderr) |
+ self.assertIn('XML ACL data provided', stderr) |
+ |
+ def test_set_xml_acl_json_api_bucket(self): |
+ """Ensures XML content returns a bad request error and migration warning.""" |
+ bucket_uri = suri(self.CreateBucket()) |
+ inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>') |
+ stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri], |
+ return_stderr=True, expected_status=1) |
+ self.assertIn('ArgumentException', stderr) |
+ self.assertIn('XML ACL data provided', stderr) |
+ |
+ def test_set_valid_acl_object(self): |
+ """Tests setting a valid ACL on an object.""" |
+ obj_uri = suri(self.CreateObject(contents='foo')) |
+ acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri], |
+ return_stdout=True) |
+ inpath = self.CreateTempFile(contents=acl_string) |
+ self.RunGsUtil(self._set_acl_prefix + ['public-read', obj_uri]) |
+ acl_string2 = self.RunGsUtil(self._get_acl_prefix + [obj_uri], |
+ return_stdout=True) |
+ self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri]) |
+ acl_string3 = self.RunGsUtil(self._get_acl_prefix + [obj_uri], |
+ return_stdout=True) |
+ |
+ self.assertNotEqual(acl_string, acl_string2) |
+ self.assertEqual(acl_string, acl_string3) |
+ |
+ def test_set_valid_permission_whitespace_object(self): |
+ """Ensures that whitespace is allowed in role and entity elements.""" |
+ obj_uri = suri(self.CreateObject(contents='foo')) |
+ acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri], |
+ return_stdout=True) |
+ acl_string = re.sub(r'"role"', r'"role" \n', acl_string) |
+ acl_string = re.sub(r'"entity"', r'\n "entity"', acl_string) |
+ inpath = self.CreateTempFile(contents=acl_string) |
+ |
+ self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri]) |
+ |
+ def test_set_valid_acl_bucket(self): |
+ """Ensures that valid canned and XML ACLs work with get/set.""" |
+ bucket_uri = suri(self.CreateBucket()) |
+ acl_string = self.RunGsUtil(self._get_acl_prefix + [bucket_uri], |
+ return_stdout=True) |
+ inpath = self.CreateTempFile(contents=acl_string) |
+ self.RunGsUtil(self._set_acl_prefix + ['public-read', bucket_uri]) |
+ acl_string2 = self.RunGsUtil(self._get_acl_prefix + [bucket_uri], |
+ return_stdout=True) |
+ self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri]) |
+ acl_string3 = self.RunGsUtil(self._get_acl_prefix + [bucket_uri], |
+ return_stdout=True) |
+ |
+ self.assertNotEqual(acl_string, acl_string2) |
+ self.assertEqual(acl_string, acl_string3) |
+ |
+ def test_invalid_canned_acl_object(self): |
+ """Ensures that an invalid canned ACL returns a CommandException.""" |
+ obj_uri = suri(self.CreateObject(contents='foo')) |
+ stderr = self.RunGsUtil( |
+ self._set_acl_prefix + ['not-a-canned-acl', obj_uri], |
+ return_stderr=True, expected_status=1) |
+ self.assertIn('CommandException', stderr) |
+ self.assertIn('Invalid canned ACL', stderr) |
+ |
+ def test_set_valid_def_acl_bucket(self): |
+ """Ensures that valid default canned and XML ACLs works with get/set.""" |
+ bucket_uri = self.CreateBucket() |
+ |
+ # Default ACL is project private. |
+ obj_uri1 = suri(self.CreateObject(bucket_uri=bucket_uri, contents='foo')) |
+ acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri1], |
+ return_stdout=True) |
+ |
+ # Change it to authenticated-read. |
+ self.RunGsUtil( |
+ self._set_defacl_prefix + ['authenticated-read', suri(bucket_uri)]) |
+ obj_uri2 = suri(self.CreateObject(bucket_uri=bucket_uri, contents='foo2')) |
+ acl_string2 = self.RunGsUtil(self._get_acl_prefix + [obj_uri2], |
+ return_stdout=True) |
+ |
+ # Now change it back to the default via XML. |
+ inpath = self.CreateTempFile(contents=acl_string) |
+ self.RunGsUtil(self._set_defacl_prefix + [inpath, suri(bucket_uri)]) |
+ obj_uri3 = suri(self.CreateObject(bucket_uri=bucket_uri, contents='foo3')) |
+ acl_string3 = self.RunGsUtil(self._get_acl_prefix + [obj_uri3], |
+ return_stdout=True) |
+ |
+ self.assertNotEqual(acl_string, acl_string2) |
+ self.assertIn('allAuthenticatedUsers', acl_string2) |
+ self.assertEqual(acl_string, acl_string3) |
+ |
+ def test_acl_set_version_specific_uri(self): |
+ """Tests setting an ACL on a specific version of an object.""" |
+ bucket_uri = self.CreateVersionedBucket() |
+ # Create initial object version. |
+ uri = self.CreateObject(bucket_uri=bucket_uri, contents='data') |
+ # Create a second object version. |
+ inpath = self.CreateTempFile(contents='def') |
+ self.RunGsUtil(['cp', inpath, uri.uri]) |
+ |
+ # Find out the two object version IDs. |
+ lines = self.AssertNObjectsInBucket(bucket_uri, 2, versioned=True) |
+ v0_uri_str, v1_uri_str = lines[0], lines[1] |
+ |
+ # Check that neither version currently has public-read permission |
+ # (default ACL is project-private). |
+ orig_acls = [] |
+ for uri_str in (v0_uri_str, v1_uri_str): |
+ acl = self.RunGsUtil(self._get_acl_prefix + [uri_str], |
+ return_stdout=True) |
+ self.assertNotIn(PUBLIC_READ_JSON_ACL_TEXT, |
+ self._strip_json_whitespace(acl)) |
+ orig_acls.append(acl) |
+ |
+ # Set the ACL for the older version of the object to public-read. |
+ self.RunGsUtil(self._set_acl_prefix + ['public-read', v0_uri_str]) |
+ # Check that the older version's ACL is public-read, but newer version |
+ # is not. |
+ acl = self.RunGsUtil(self._get_acl_prefix + [v0_uri_str], |
+ return_stdout=True) |
+ self.assertIn(PUBLIC_READ_JSON_ACL_TEXT, self._strip_json_whitespace(acl)) |
+ acl = self.RunGsUtil(self._get_acl_prefix + [v1_uri_str], |
+ return_stdout=True) |
+ self.assertNotIn(PUBLIC_READ_JSON_ACL_TEXT, |
+ self._strip_json_whitespace(acl)) |
+ |
+ # Check that reading the ACL with the version-less URI returns the |
+ # original ACL (since the version-less URI means the current version). |
+ acl = self.RunGsUtil(self._get_acl_prefix + [uri.uri], return_stdout=True) |
+ self.assertEqual(acl, orig_acls[0]) |
+ |
+ def _strip_json_whitespace(self, json_text): |
+ return re.sub(r'\s*', '', json_text) |
+ |
+ def testAclChangeWithUserId(self): |
+ change = aclhelpers.AclChange(self.USER_TEST_ID + ':r', |
+ scope_type=aclhelpers.ChangeType.USER) |
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
+ change.Execute(self.sample_url, acl, 'acl', self.logger) |
+ self._AssertHas(acl, 'READER', 'UserById', self.USER_TEST_ID) |
+ |
+ def testAclChangeWithGroupId(self): |
+ change = aclhelpers.AclChange(self.GROUP_TEST_ID + ':r', |
+ scope_type=aclhelpers.ChangeType.GROUP) |
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
+ change.Execute(self.sample_url, acl, 'acl', self.logger) |
+ self._AssertHas(acl, 'READER', 'GroupById', self.GROUP_TEST_ID) |
+ |
+ def testAclChangeWithUserEmail(self): |
+ change = aclhelpers.AclChange(self.USER_TEST_ADDRESS + ':r', |
+ scope_type=aclhelpers.ChangeType.USER) |
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
+ change.Execute(self.sample_url, acl, 'acl', self.logger) |
+ self._AssertHas(acl, 'READER', 'UserByEmail', self.USER_TEST_ADDRESS) |
+ |
+ def testAclChangeWithGroupEmail(self): |
+ change = aclhelpers.AclChange(self.GROUP_TEST_ADDRESS + ':fc', |
+ scope_type=aclhelpers.ChangeType.GROUP) |
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
+ change.Execute(self.sample_url, acl, 'acl', self.logger) |
+ self._AssertHas(acl, 'OWNER', 'GroupByEmail', self.GROUP_TEST_ADDRESS) |
+ |
+ def testAclChangeWithDomain(self): |
+ change = aclhelpers.AclChange(self.DOMAIN_TEST + ':READ', |
+ scope_type=aclhelpers.ChangeType.GROUP) |
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
+ change.Execute(self.sample_url, acl, 'acl', self.logger) |
+ self._AssertHas(acl, 'READER', 'GroupByDomain', self.DOMAIN_TEST) |
+ |
+ def testAclChangeWithProjectOwners(self): |
+ change = aclhelpers.AclChange(self._project_test_acl + ':READ', |
+ scope_type=aclhelpers.ChangeType.PROJECT) |
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
+ change.Execute(self.sample_url, acl, 'acl', self.logger) |
+ self._AssertHas(acl, 'READER', 'Project', self._project_test_acl) |
+ |
+ def testAclChangeWithAllUsers(self): |
+ change = aclhelpers.AclChange('AllUsers:WRITE', |
+ scope_type=aclhelpers.ChangeType.GROUP) |
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
+ change.Execute(self.sample_url, acl, 'acl', self.logger) |
+ self._AssertHas(acl, 'WRITER', 'AllUsers') |
+ |
+ def testAclChangeWithAllAuthUsers(self): |
+ change = aclhelpers.AclChange('AllAuthenticatedUsers:READ', |
+ scope_type=aclhelpers.ChangeType.GROUP) |
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
+ change.Execute(self.sample_url, acl, 'acl', self.logger) |
+ self._AssertHas(acl, 'READER', 'AllAuthenticatedUsers') |
+ remove = aclhelpers.AclDel('AllAuthenticatedUsers') |
+ remove.Execute(self.sample_url, acl, 'acl', self.logger) |
+ self._AssertHasNo(acl, 'READER', 'AllAuthenticatedUsers') |
+ |
+ def testAclDelWithUser(self): |
+ add = aclhelpers.AclChange(self.USER_TEST_ADDRESS + ':READ', |
+ scope_type=aclhelpers.ChangeType.USER) |
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
+ add.Execute(self.sample_url, acl, 'acl', self.logger) |
+ self._AssertHas(acl, 'READER', 'UserByEmail', self.USER_TEST_ADDRESS) |
+ |
+ remove = aclhelpers.AclDel(self.USER_TEST_ADDRESS) |
+ remove.Execute(self.sample_url, acl, 'acl', self.logger) |
+ self._AssertHasNo(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS) |
+ |
+ def testAclDelWithProjectOwners(self): |
+ add = aclhelpers.AclChange(self._project_test_acl + ':READ', |
+ scope_type=aclhelpers.ChangeType.PROJECT) |
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
+ add.Execute(self.sample_url, acl, 'acl', self.logger) |
+ self._AssertHas(acl, 'READER', 'Project', self._project_test_acl) |
+ |
+ remove = aclhelpers.AclDel(self._project_test_acl) |
+ remove.Execute(self.sample_url, acl, 'acl', self.logger) |
+ self._AssertHasNo(acl, 'READ', 'Project', self._project_test_acl) |
+ |
+ def testAclDelWithGroup(self): |
+ add = aclhelpers.AclChange(self.USER_TEST_ADDRESS + ':READ', |
+ scope_type=aclhelpers.ChangeType.GROUP) |
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
+ add.Execute(self.sample_url, acl, 'acl', self.logger) |
+ self._AssertHas(acl, 'READER', 'GroupByEmail', self.USER_TEST_ADDRESS) |
+ |
+ remove = aclhelpers.AclDel(self.USER_TEST_ADDRESS) |
+ remove.Execute(self.sample_url, acl, 'acl', self.logger) |
+ self._AssertHasNo(acl, 'READER', 'GroupByEmail', self.GROUP_TEST_ADDRESS) |
+ |
+ # |
+ # Here are a whole lot of verbose asserts |
+ # |
+ |
+ def _AssertHas(self, current_acl, perm, scope, value=None): |
+ matches = list(self._YieldMatchingEntriesJson(current_acl, perm, scope, |
+ value)) |
+ self.assertEqual(1, len(matches)) |
+ |
+ def _AssertHasNo(self, current_acl, perm, scope, value=None): |
+ matches = list(self._YieldMatchingEntriesJson(current_acl, perm, scope, |
+ value)) |
+ self.assertEqual(0, len(matches)) |
+ |
+ def _YieldMatchingEntriesJson(self, current_acl, perm, scope, value=None): |
+ """Generator that yields entries that match the change descriptor. |
+ |
+ Args: |
+ current_acl: A list of apitools_messages.BucketAccessControls or |
+ ObjectAccessControls which will be searched for matching |
+ entries. |
+ perm: Role (permission) to match. |
+ scope: Scope type to match. |
+ value: Value to match (against the scope type). |
+ |
+ Yields: |
+ An apitools_messages.BucketAccessControl or ObjectAccessControl. |
+ """ |
+ for entry in current_acl: |
+ if (scope in ['UserById', 'GroupById'] and |
+ entry.entityId and value == entry.entityId and |
+ entry.role == perm): |
+ yield entry |
+ elif (scope in ['UserByEmail', 'GroupByEmail'] and |
+ entry.email and value == entry.email and |
+ entry.role == perm): |
+ yield entry |
+ elif (scope == 'GroupByDomain' and |
+ entry.domain and value == entry.domain and |
+ entry.role == perm): |
+ yield entry |
+ elif (scope == 'Project' and entry.role == perm and |
+ value == entry.entityId): |
+ yield entry |
+ elif (scope in ['AllUsers', 'AllAuthenticatedUsers'] and |
+ entry.entity.lower() == scope.lower() and |
+ entry.role == perm): |
+ yield entry |
+ |
+ def _MakeScopeRegex(self, role, entity_type, email_address): |
+ template_regex = (r'\{.*"entity":\s*"%s-%s".*"role":\s*"%s".*\}' % |
+ (entity_type, email_address, role)) |
+ return re.compile(template_regex, flags=re.DOTALL) |
+ |
+ def _MakeProjectScopeRegex(self, role, project_team): |
+ template_regex = (r'\{.*"entity":\s*"project-%s-\d+",\s*"projectTeam":\s*' |
+ r'\{\s*"projectNumber":\s*"(\d+)",\s*"team":\s*"%s"\s*\},' |
+ r'\s*"role":\s*"%s".*\}') % (project_team, project_team, |
+ role) |
+ |
+ return re.compile(template_regex, flags=re.DOTALL) |
+ |
+ def testBucketAclChange(self): |
+ """Tests acl change on a bucket.""" |
+ test_regex = self._MakeScopeRegex( |
+ 'OWNER', 'user', self.USER_TEST_ADDRESS) |
+ json_text = self.RunGsUtil( |
+ self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True) |
+ self.assertNotRegexpMatches(json_text, test_regex) |
+ |
+ self.RunGsUtil(self._ch_acl_prefix + |
+ ['-u', self.USER_TEST_ADDRESS+':fc', suri(self.sample_uri)]) |
+ json_text = self.RunGsUtil( |
+ self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True) |
+ self.assertRegexpMatches(json_text, test_regex) |
+ |
+ test_regex2 = self._MakeScopeRegex( |
+ 'WRITER', 'user', self.USER_TEST_ADDRESS) |
+ self.RunGsUtil(self._ch_acl_prefix + |
+ ['-u', self.USER_TEST_ADDRESS+':w', suri(self.sample_uri)]) |
+ json_text2 = self.RunGsUtil( |
+ self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True) |
+ self.assertRegexpMatches(json_text2, test_regex2) |
+ |
+ self.RunGsUtil(self._ch_acl_prefix + |
+ ['-d', self.USER_TEST_ADDRESS, suri(self.sample_uri)]) |
+ |
+ json_text3 = self.RunGsUtil( |
+ self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True) |
+ self.assertNotRegexpMatches(json_text3, test_regex) |
+ |
+ def testProjectAclChangesOnBucket(self): |
+ """Tests project entity acl changes on a bucket.""" |
+ |
+ if self.test_api == ApiSelector.XML: |
+ stderr = self.RunGsUtil(self._ch_acl_prefix + |
+ ['-p', self._project_test_acl +':w', |
+ suri(self.sample_uri)], |
+ expected_status=1, |
+ return_stderr=True) |
+ self.assertIn(('CommandException: XML API does not support project' |
+ ' scopes, cannot translate ACL.'), stderr) |
+ else: |
+ test_regex = self._MakeProjectScopeRegex( |
+ 'WRITER', self._project_team) |
+ self.RunGsUtil(self._ch_acl_prefix + |
+ ['-p', self._project_test_acl +':w', |
+ suri(self.sample_uri)]) |
+ json_text = self.RunGsUtil( |
+ self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True) |
+ |
+ self.assertRegexpMatches(json_text, test_regex) |
+ |
+ # The api will accept string project ids, but stores the numeric project |
+ # ids internally, this extracts the numeric id from the returned acls. |
+ proj_num_id = test_regex.search(json_text).group(1) |
+ acl_to_remove = '%s-%s' % (self._project_team, proj_num_id) |
+ |
+ self.RunGsUtil(self._ch_acl_prefix + |
+ ['-d', acl_to_remove, suri(self.sample_uri)]) |
+ |
+ json_text2 = self.RunGsUtil( |
+ self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True) |
+ self.assertNotRegexpMatches(json_text2, test_regex) |
+ |
+ def testObjectAclChange(self): |
+ """Tests acl change on an object.""" |
+ obj = self.CreateObject(bucket_uri=self.sample_uri, contents='something') |
+ self.AssertNObjectsInBucket(self.sample_uri, 1) |
+ |
+ test_regex = self._MakeScopeRegex( |
+ 'READER', 'group', self.GROUP_TEST_ADDRESS) |
+ json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
+ return_stdout=True) |
+ self.assertNotRegexpMatches(json_text, test_regex) |
+ |
+ self.RunGsUtil(self._ch_acl_prefix + |
+ ['-g', self.GROUP_TEST_ADDRESS+':READ', suri(obj)]) |
+ json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
+ return_stdout=True) |
+ self.assertRegexpMatches(json_text, test_regex) |
+ |
+ test_regex2 = self._MakeScopeRegex( |
+ 'OWNER', 'group', self.GROUP_TEST_ADDRESS) |
+ self.RunGsUtil(self._ch_acl_prefix + |
+ ['-g', self.GROUP_TEST_ADDRESS+':OWNER', suri(obj)]) |
+ json_text2 = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
+ return_stdout=True) |
+ self.assertRegexpMatches(json_text2, test_regex2) |
+ |
+ self.RunGsUtil(self._ch_acl_prefix + |
+ ['-d', self.GROUP_TEST_ADDRESS, suri(obj)]) |
+ json_text3 = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
+ return_stdout=True) |
+ self.assertNotRegexpMatches(json_text3, test_regex2) |
+ |
+ all_auth_regex = re.compile( |
+ r'\{.*"entity":\s*"allAuthenticatedUsers".*"role":\s*"OWNER".*\}', |
+ flags=re.DOTALL) |
+ |
+ self.RunGsUtil(self._ch_acl_prefix + ['-g', 'AllAuth:O', suri(obj)]) |
+ json_text4 = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
+ return_stdout=True) |
+ self.assertRegexpMatches(json_text4, all_auth_regex) |
+ |
+ def testObjectAclChangeAllUsers(self): |
+ """Tests acl ch AllUsers:R on an object.""" |
+ obj = self.CreateObject(bucket_uri=self.sample_uri, contents='something') |
+ self.AssertNObjectsInBucket(self.sample_uri, 1) |
+ |
+ all_users_regex = re.compile( |
+ r'\{.*"entity":\s*"allUsers".*"role":\s*"READER".*\}', flags=re.DOTALL) |
+ json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
+ return_stdout=True) |
+ self.assertNotRegexpMatches(json_text, all_users_regex) |
+ |
+ self.RunGsUtil(self._ch_acl_prefix + |
+ ['-g', 'AllUsers:R', suri(obj)]) |
+ json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
+ return_stdout=True) |
+ self.assertRegexpMatches(json_text, all_users_regex) |
+ |
+ def testMultithreadedAclChange(self, count=10): |
+ """Tests multi-threaded acl changing on several objects.""" |
+ objects = [] |
+ for i in range(count): |
+ objects.append(self.CreateObject( |
+ bucket_uri=self.sample_uri, |
+ contents='something {0}'.format(i))) |
+ |
+ self.AssertNObjectsInBucket(self.sample_uri, count) |
+ |
+ test_regex = self._MakeScopeRegex( |
+ 'READER', 'group', self.GROUP_TEST_ADDRESS) |
+ json_texts = [] |
+ for obj in objects: |
+ json_texts.append(self.RunGsUtil( |
+ self._get_acl_prefix + [suri(obj)], return_stdout=True)) |
+ for json_text in json_texts: |
+ self.assertNotRegexpMatches(json_text, test_regex) |
+ |
+ uris = [suri(obj) for obj in objects] |
+ self.RunGsUtil(['-m', '-DD'] + self._ch_acl_prefix + |
+ ['-g', self.GROUP_TEST_ADDRESS+':READ'] + uris) |
+ |
+ json_texts = [] |
+ for obj in objects: |
+ json_texts.append(self.RunGsUtil( |
+ self._get_acl_prefix + [suri(obj)], return_stdout=True)) |
+ for json_text in json_texts: |
+ self.assertRegexpMatches(json_text, test_regex) |
+ |
+ def testRecursiveChangeAcl(self): |
+ """Tests recursively changing ACLs on nested objects.""" |
+ obj = self.CreateObject(bucket_uri=self.sample_uri, object_name='foo/bar', |
+ contents='something') |
+ self.AssertNObjectsInBucket(self.sample_uri, 1) |
+ |
+ test_regex = self._MakeScopeRegex( |
+ 'READER', 'group', self.GROUP_TEST_ADDRESS) |
+ json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
+ return_stdout=True) |
+ self.assertNotRegexpMatches(json_text, test_regex) |
+ |
+ @Retry(AssertionError, tries=5, timeout_secs=1) |
+ def _AddAcl(): |
+ self.RunGsUtil( |
+ self._ch_acl_prefix + |
+ ['-R', '-g', self.GROUP_TEST_ADDRESS+':READ', suri(obj)[:-3]]) |
+ json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
+ return_stdout=True) |
+ self.assertRegexpMatches(json_text, test_regex) |
+ _AddAcl() |
+ |
+ @Retry(AssertionError, tries=5, timeout_secs=1) |
+ def _DeleteAcl(): |
+ self.RunGsUtil(self._ch_acl_prefix + |
+ ['-d', self.GROUP_TEST_ADDRESS, suri(obj)]) |
+ json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
+ return_stdout=True) |
+ self.assertNotRegexpMatches(json_text, test_regex) |
+ _DeleteAcl() |
+ |
+ def testMultiVersionSupport(self): |
+ """Tests changing ACLs on multiple object versions.""" |
+ bucket = self.CreateVersionedBucket() |
+ object_name = self.MakeTempName('obj') |
+ self.CreateObject( |
+ bucket_uri=bucket, object_name=object_name, contents='One thing') |
+ # Create another on the same URI, giving us a second version. |
+ self.CreateObject( |
+ bucket_uri=bucket, object_name=object_name, contents='Another thing') |
+ |
+ lines = self.AssertNObjectsInBucket(bucket, 2, versioned=True) |
+ |
+ obj_v1, obj_v2 = lines[0], lines[1] |
+ |
+ test_regex = self._MakeScopeRegex( |
+ 'READER', 'group', self.GROUP_TEST_ADDRESS) |
+ json_text = self.RunGsUtil(self._get_acl_prefix + [obj_v1], |
+ return_stdout=True) |
+ self.assertNotRegexpMatches(json_text, test_regex) |
+ |
+ self.RunGsUtil(self._ch_acl_prefix + |
+ ['-g', self.GROUP_TEST_ADDRESS+':READ', obj_v1]) |
+ json_text = self.RunGsUtil(self._get_acl_prefix + [obj_v1], |
+ return_stdout=True) |
+ self.assertRegexpMatches(json_text, test_regex) |
+ |
+ json_text = self.RunGsUtil(self._get_acl_prefix + [obj_v2], |
+ return_stdout=True) |
+ self.assertNotRegexpMatches(json_text, test_regex) |
+ |
+ def testBadRequestAclChange(self): |
+ stdout, stderr = self.RunGsUtil( |
+ self._ch_acl_prefix + |
+ ['-u', 'invalid_$$@hello.com:R', suri(self.sample_uri)], |
+ return_stdout=True, return_stderr=True, expected_status=1) |
+ self.assertIn('BadRequestException', stderr) |
+ self.assertNotIn('Retrying', stdout) |
+ self.assertNotIn('Retrying', stderr) |
+ |
+ def testAclGetWithoutFullControl(self): |
+ object_uri = self.CreateObject(contents='foo') |
+ with self.SetAnonymousBotoCreds(): |
+ stderr = self.RunGsUtil(self._get_acl_prefix + [suri(object_uri)], |
+ return_stderr=True, expected_status=1) |
+ self.assertIn('AccessDeniedException', stderr) |
+ |
+ def testTooFewArgumentsFails(self): |
+ """Tests calling ACL commands with insufficient number of arguments.""" |
+ # No arguments for get, but valid subcommand. |
+ stderr = self.RunGsUtil(self._get_acl_prefix, return_stderr=True, |
+ expected_status=1) |
+ self.assertIn('command requires at least', stderr) |
+ |
+ # No arguments for set, but valid subcommand. |
+ stderr = self.RunGsUtil(self._set_acl_prefix, return_stderr=True, |
+ expected_status=1) |
+ self.assertIn('command requires at least', stderr) |
+ |
+ # No arguments for ch, but valid subcommand. |
+ stderr = self.RunGsUtil(self._ch_acl_prefix, return_stderr=True, |
+ expected_status=1) |
+ self.assertIn('command requires at least', stderr) |
+ |
+ # Neither arguments nor subcommand. |
+ stderr = self.RunGsUtil(['acl'], return_stderr=True, expected_status=1) |
+ self.assertIn('command requires at least', stderr) |
+ |
+ def testMinusF(self): |
+ """Tests -f option to continue after failure.""" |
+ bucket_uri = self.CreateBucket() |
+ obj_uri = suri(self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
+ contents='foo')) |
+ acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri], |
+ return_stdout=True) |
+ self.RunGsUtil(self._set_acl_prefix + |
+ ['-f', 'public-read', suri(bucket_uri) + 'foo2', obj_uri], |
+ expected_status=1) |
+ acl_string2 = self.RunGsUtil(self._get_acl_prefix + [obj_uri], |
+ return_stdout=True) |
+ |
+ self.assertNotEqual(acl_string, acl_string2) |
+ |
+ |
+class TestS3CompatibleAcl(TestAclBase): |
+ """ACL integration tests that work for s3 and gs URLs.""" |
+ |
+ def testAclObjectGetSet(self): |
+ bucket_uri = self.CreateBucket() |
+ obj_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo') |
+ self.AssertNObjectsInBucket(bucket_uri, 1) |
+ |
+ stdout = self.RunGsUtil(self._get_acl_prefix + [suri(obj_uri)], |
+ return_stdout=True) |
+ set_contents = self.CreateTempFile(contents=stdout) |
+ self.RunGsUtil(self._set_acl_prefix + [set_contents, suri(obj_uri)]) |
+ |
+ def testAclBucketGetSet(self): |
+ bucket_uri = self.CreateBucket() |
+ stdout = self.RunGsUtil(self._get_acl_prefix + [suri(bucket_uri)], |
+ return_stdout=True) |
+ set_contents = self.CreateTempFile(contents=stdout) |
+ self.RunGsUtil(self._set_acl_prefix + [set_contents, suri(bucket_uri)]) |
+ |
+ |
+@SkipForGS('S3 ACLs accept XML and should not cause an XML warning.') |
+class TestS3OnlyAcl(TestAclBase): |
+ """ACL integration tests that work only for s3 URLs.""" |
+ |
+ # TODO: Format all test case names consistently. |
+ def test_set_xml_acl(self): |
+ """Ensures XML content does not return an XML warning for S3.""" |
+ obj_uri = suri(self.CreateObject(contents='foo')) |
+ inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>') |
+ stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri], |
+ return_stderr=True, expected_status=1) |
+ self.assertIn('BadRequestException', stderr) |
+ self.assertNotIn('XML ACL data provided', stderr) |
+ |
+ def test_set_xml_acl_bucket(self): |
+ """Ensures XML content does not return an XML warning for S3.""" |
+ bucket_uri = suri(self.CreateBucket()) |
+ inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>') |
+ stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri], |
+ return_stderr=True, expected_status=1) |
+ self.assertIn('BadRequestException', stderr) |
+ self.assertNotIn('XML ACL data provided', stderr) |
+ |
+ |
+class TestAclOldAlias(TestAcl): |
+ _set_acl_prefix = ['setacl'] |
+ _get_acl_prefix = ['getacl'] |
+ _set_defacl_prefix = ['setdefacl'] |
+ _ch_acl_prefix = ['chacl'] |