Index: tools/telemetry/third_party/gsutil/gslib/tests/test_acl.py |
diff --git a/tools/telemetry/third_party/gsutil/gslib/tests/test_acl.py b/tools/telemetry/third_party/gsutil/gslib/tests/test_acl.py |
deleted file mode 100644 |
index 79c2c5b982e0ad1b07abbf71729d25b06ef14a4f..0000000000000000000000000000000000000000 |
--- a/tools/telemetry/third_party/gsutil/gslib/tests/test_acl.py |
+++ /dev/null |
@@ -1,681 +0,0 @@ |
-# -*- coding: utf-8 -*- |
-# Copyright 2013 Google Inc. All Rights Reserved. |
-# |
-# Licensed under the Apache License, Version 2.0 (the "License"); |
-# you may not use this file except in compliance with the License. |
-# You may obtain a copy of the License at |
-# |
-# http://www.apache.org/licenses/LICENSE-2.0 |
-# |
-# Unless required by applicable law or agreed to in writing, software |
-# distributed under the License is distributed on an "AS IS" BASIS, |
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
-# See the License for the specific language governing permissions and |
-# limitations under the License. |
-"""Integration tests for the acl command.""" |
- |
-from __future__ import absolute_import |
- |
-import re |
- |
-from gslib import aclhelpers |
-from gslib.command import CreateGsutilLogger |
-from gslib.cs_api_map import ApiSelector |
-from gslib.project_id import PopulateProjectId |
-from gslib.storage_url import StorageUrlFromString |
-import gslib.tests.testcase as testcase |
-from gslib.tests.testcase.integration_testcase import SkipForGS |
-from gslib.tests.testcase.integration_testcase import SkipForS3 |
-from gslib.tests.util import ObjectToURI as suri |
-from gslib.translation_helper import AclTranslation |
-from gslib.util import Retry |
- |
-PUBLIC_READ_JSON_ACL_TEXT = '"entity":"allUsers","role":"READER"' |
- |
- |
-class TestAclBase(testcase.GsUtilIntegrationTestCase): |
- """Integration test case base class for acl command.""" |
- |
- _set_acl_prefix = ['acl', 'set'] |
- _get_acl_prefix = ['acl', 'get'] |
- _set_defacl_prefix = ['defacl', 'set'] |
- _ch_acl_prefix = ['acl', 'ch'] |
- |
- _project_team = 'owners' |
- _project_test_acl = '%s-%s' % (_project_team, PopulateProjectId()) |
- |
- |
-@SkipForS3('Tests use GS ACL model.') |
-class TestAcl(TestAclBase): |
- """Integration tests for acl command.""" |
- |
- def setUp(self): |
- super(TestAcl, self).setUp() |
- self.sample_uri = self.CreateBucket() |
- self.sample_url = StorageUrlFromString(str(self.sample_uri)) |
- self.logger = CreateGsutilLogger('acl') |
- |
- def test_set_invalid_acl_object(self): |
- """Ensures that invalid content returns a bad request error.""" |
- obj_uri = suri(self.CreateObject(contents='foo')) |
- inpath = self.CreateTempFile(contents='badAcl') |
- stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri], |
- return_stderr=True, expected_status=1) |
- self.assertIn('ArgumentException', stderr) |
- |
- def test_set_invalid_acl_bucket(self): |
- """Ensures that invalid content returns a bad request error.""" |
- bucket_uri = suri(self.CreateBucket()) |
- inpath = self.CreateTempFile(contents='badAcl') |
- stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri], |
- return_stderr=True, expected_status=1) |
- self.assertIn('ArgumentException', stderr) |
- |
- def test_set_xml_acl_json_api_object(self): |
- """Ensures XML content returns a bad request error and migration warning.""" |
- obj_uri = suri(self.CreateObject(contents='foo')) |
- inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>') |
- stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri], |
- return_stderr=True, expected_status=1) |
- self.assertIn('ArgumentException', stderr) |
- self.assertIn('XML ACL data provided', stderr) |
- |
- def test_set_xml_acl_json_api_bucket(self): |
- """Ensures XML content returns a bad request error and migration warning.""" |
- bucket_uri = suri(self.CreateBucket()) |
- inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>') |
- stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri], |
- return_stderr=True, expected_status=1) |
- self.assertIn('ArgumentException', stderr) |
- self.assertIn('XML ACL data provided', stderr) |
- |
- def test_set_valid_acl_object(self): |
- """Tests setting a valid ACL on an object.""" |
- obj_uri = suri(self.CreateObject(contents='foo')) |
- acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri], |
- return_stdout=True) |
- inpath = self.CreateTempFile(contents=acl_string) |
- self.RunGsUtil(self._set_acl_prefix + ['public-read', obj_uri]) |
- acl_string2 = self.RunGsUtil(self._get_acl_prefix + [obj_uri], |
- return_stdout=True) |
- self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri]) |
- acl_string3 = self.RunGsUtil(self._get_acl_prefix + [obj_uri], |
- return_stdout=True) |
- |
- self.assertNotEqual(acl_string, acl_string2) |
- self.assertEqual(acl_string, acl_string3) |
- |
- def test_set_valid_permission_whitespace_object(self): |
- """Ensures that whitespace is allowed in role and entity elements.""" |
- obj_uri = suri(self.CreateObject(contents='foo')) |
- acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri], |
- return_stdout=True) |
- acl_string = re.sub(r'"role"', r'"role" \n', acl_string) |
- acl_string = re.sub(r'"entity"', r'\n "entity"', acl_string) |
- inpath = self.CreateTempFile(contents=acl_string) |
- |
- self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri]) |
- |
- def test_set_valid_acl_bucket(self): |
- """Ensures that valid canned and XML ACLs work with get/set.""" |
- bucket_uri = suri(self.CreateBucket()) |
- acl_string = self.RunGsUtil(self._get_acl_prefix + [bucket_uri], |
- return_stdout=True) |
- inpath = self.CreateTempFile(contents=acl_string) |
- self.RunGsUtil(self._set_acl_prefix + ['public-read', bucket_uri]) |
- acl_string2 = self.RunGsUtil(self._get_acl_prefix + [bucket_uri], |
- return_stdout=True) |
- self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri]) |
- acl_string3 = self.RunGsUtil(self._get_acl_prefix + [bucket_uri], |
- return_stdout=True) |
- |
- self.assertNotEqual(acl_string, acl_string2) |
- self.assertEqual(acl_string, acl_string3) |
- |
- def test_invalid_canned_acl_object(self): |
- """Ensures that an invalid canned ACL returns a CommandException.""" |
- obj_uri = suri(self.CreateObject(contents='foo')) |
- stderr = self.RunGsUtil( |
- self._set_acl_prefix + ['not-a-canned-acl', obj_uri], |
- return_stderr=True, expected_status=1) |
- self.assertIn('CommandException', stderr) |
- self.assertIn('Invalid canned ACL', stderr) |
- |
- def test_set_valid_def_acl_bucket(self): |
- """Ensures that valid default canned and XML ACLs works with get/set.""" |
- bucket_uri = self.CreateBucket() |
- |
- # Default ACL is project private. |
- obj_uri1 = suri(self.CreateObject(bucket_uri=bucket_uri, contents='foo')) |
- acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri1], |
- return_stdout=True) |
- |
- # Change it to authenticated-read. |
- self.RunGsUtil( |
- self._set_defacl_prefix + ['authenticated-read', suri(bucket_uri)]) |
- obj_uri2 = suri(self.CreateObject(bucket_uri=bucket_uri, contents='foo2')) |
- acl_string2 = self.RunGsUtil(self._get_acl_prefix + [obj_uri2], |
- return_stdout=True) |
- |
- # Now change it back to the default via XML. |
- inpath = self.CreateTempFile(contents=acl_string) |
- self.RunGsUtil(self._set_defacl_prefix + [inpath, suri(bucket_uri)]) |
- obj_uri3 = suri(self.CreateObject(bucket_uri=bucket_uri, contents='foo3')) |
- acl_string3 = self.RunGsUtil(self._get_acl_prefix + [obj_uri3], |
- return_stdout=True) |
- |
- self.assertNotEqual(acl_string, acl_string2) |
- self.assertIn('allAuthenticatedUsers', acl_string2) |
- self.assertEqual(acl_string, acl_string3) |
- |
- def test_acl_set_version_specific_uri(self): |
- """Tests setting an ACL on a specific version of an object.""" |
- bucket_uri = self.CreateVersionedBucket() |
- # Create initial object version. |
- uri = self.CreateObject(bucket_uri=bucket_uri, contents='data') |
- # Create a second object version. |
- inpath = self.CreateTempFile(contents='def') |
- self.RunGsUtil(['cp', inpath, uri.uri]) |
- |
- # Find out the two object version IDs. |
- lines = self.AssertNObjectsInBucket(bucket_uri, 2, versioned=True) |
- v0_uri_str, v1_uri_str = lines[0], lines[1] |
- |
- # Check that neither version currently has public-read permission |
- # (default ACL is project-private). |
- orig_acls = [] |
- for uri_str in (v0_uri_str, v1_uri_str): |
- acl = self.RunGsUtil(self._get_acl_prefix + [uri_str], |
- return_stdout=True) |
- self.assertNotIn(PUBLIC_READ_JSON_ACL_TEXT, |
- self._strip_json_whitespace(acl)) |
- orig_acls.append(acl) |
- |
- # Set the ACL for the older version of the object to public-read. |
- self.RunGsUtil(self._set_acl_prefix + ['public-read', v0_uri_str]) |
- # Check that the older version's ACL is public-read, but newer version |
- # is not. |
- acl = self.RunGsUtil(self._get_acl_prefix + [v0_uri_str], |
- return_stdout=True) |
- self.assertIn(PUBLIC_READ_JSON_ACL_TEXT, self._strip_json_whitespace(acl)) |
- acl = self.RunGsUtil(self._get_acl_prefix + [v1_uri_str], |
- return_stdout=True) |
- self.assertNotIn(PUBLIC_READ_JSON_ACL_TEXT, |
- self._strip_json_whitespace(acl)) |
- |
- # Check that reading the ACL with the version-less URI returns the |
- # original ACL (since the version-less URI means the current version). |
- acl = self.RunGsUtil(self._get_acl_prefix + [uri.uri], return_stdout=True) |
- self.assertEqual(acl, orig_acls[0]) |
- |
- def _strip_json_whitespace(self, json_text): |
- return re.sub(r'\s*', '', json_text) |
- |
- def testAclChangeWithUserId(self): |
- change = aclhelpers.AclChange(self.USER_TEST_ID + ':r', |
- scope_type=aclhelpers.ChangeType.USER) |
- acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
- change.Execute(self.sample_url, acl, 'acl', self.logger) |
- self._AssertHas(acl, 'READER', 'UserById', self.USER_TEST_ID) |
- |
- def testAclChangeWithGroupId(self): |
- change = aclhelpers.AclChange(self.GROUP_TEST_ID + ':r', |
- scope_type=aclhelpers.ChangeType.GROUP) |
- acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
- change.Execute(self.sample_url, acl, 'acl', self.logger) |
- self._AssertHas(acl, 'READER', 'GroupById', self.GROUP_TEST_ID) |
- |
- def testAclChangeWithUserEmail(self): |
- change = aclhelpers.AclChange(self.USER_TEST_ADDRESS + ':r', |
- scope_type=aclhelpers.ChangeType.USER) |
- acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
- change.Execute(self.sample_url, acl, 'acl', self.logger) |
- self._AssertHas(acl, 'READER', 'UserByEmail', self.USER_TEST_ADDRESS) |
- |
- def testAclChangeWithGroupEmail(self): |
- change = aclhelpers.AclChange(self.GROUP_TEST_ADDRESS + ':fc', |
- scope_type=aclhelpers.ChangeType.GROUP) |
- acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
- change.Execute(self.sample_url, acl, 'acl', self.logger) |
- self._AssertHas(acl, 'OWNER', 'GroupByEmail', self.GROUP_TEST_ADDRESS) |
- |
- def testAclChangeWithDomain(self): |
- change = aclhelpers.AclChange(self.DOMAIN_TEST + ':READ', |
- scope_type=aclhelpers.ChangeType.GROUP) |
- acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
- change.Execute(self.sample_url, acl, 'acl', self.logger) |
- self._AssertHas(acl, 'READER', 'GroupByDomain', self.DOMAIN_TEST) |
- |
- def testAclChangeWithProjectOwners(self): |
- change = aclhelpers.AclChange(self._project_test_acl + ':READ', |
- scope_type=aclhelpers.ChangeType.PROJECT) |
- acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
- change.Execute(self.sample_url, acl, 'acl', self.logger) |
- self._AssertHas(acl, 'READER', 'Project', self._project_test_acl) |
- |
- def testAclChangeWithAllUsers(self): |
- change = aclhelpers.AclChange('AllUsers:WRITE', |
- scope_type=aclhelpers.ChangeType.GROUP) |
- acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
- change.Execute(self.sample_url, acl, 'acl', self.logger) |
- self._AssertHas(acl, 'WRITER', 'AllUsers') |
- |
- def testAclChangeWithAllAuthUsers(self): |
- change = aclhelpers.AclChange('AllAuthenticatedUsers:READ', |
- scope_type=aclhelpers.ChangeType.GROUP) |
- acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
- change.Execute(self.sample_url, acl, 'acl', self.logger) |
- self._AssertHas(acl, 'READER', 'AllAuthenticatedUsers') |
- remove = aclhelpers.AclDel('AllAuthenticatedUsers') |
- remove.Execute(self.sample_url, acl, 'acl', self.logger) |
- self._AssertHasNo(acl, 'READER', 'AllAuthenticatedUsers') |
- |
- def testAclDelWithUser(self): |
- add = aclhelpers.AclChange(self.USER_TEST_ADDRESS + ':READ', |
- scope_type=aclhelpers.ChangeType.USER) |
- acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
- add.Execute(self.sample_url, acl, 'acl', self.logger) |
- self._AssertHas(acl, 'READER', 'UserByEmail', self.USER_TEST_ADDRESS) |
- |
- remove = aclhelpers.AclDel(self.USER_TEST_ADDRESS) |
- remove.Execute(self.sample_url, acl, 'acl', self.logger) |
- self._AssertHasNo(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS) |
- |
- def testAclDelWithProjectOwners(self): |
- add = aclhelpers.AclChange(self._project_test_acl + ':READ', |
- scope_type=aclhelpers.ChangeType.PROJECT) |
- acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
- add.Execute(self.sample_url, acl, 'acl', self.logger) |
- self._AssertHas(acl, 'READER', 'Project', self._project_test_acl) |
- |
- remove = aclhelpers.AclDel(self._project_test_acl) |
- remove.Execute(self.sample_url, acl, 'acl', self.logger) |
- self._AssertHasNo(acl, 'READ', 'Project', self._project_test_acl) |
- |
- def testAclDelWithGroup(self): |
- add = aclhelpers.AclChange(self.USER_TEST_ADDRESS + ':READ', |
- scope_type=aclhelpers.ChangeType.GROUP) |
- acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) |
- add.Execute(self.sample_url, acl, 'acl', self.logger) |
- self._AssertHas(acl, 'READER', 'GroupByEmail', self.USER_TEST_ADDRESS) |
- |
- remove = aclhelpers.AclDel(self.USER_TEST_ADDRESS) |
- remove.Execute(self.sample_url, acl, 'acl', self.logger) |
- self._AssertHasNo(acl, 'READER', 'GroupByEmail', self.GROUP_TEST_ADDRESS) |
- |
- # |
- # Here are a whole lot of verbose asserts |
- # |
- |
- def _AssertHas(self, current_acl, perm, scope, value=None): |
- matches = list(self._YieldMatchingEntriesJson(current_acl, perm, scope, |
- value)) |
- self.assertEqual(1, len(matches)) |
- |
- def _AssertHasNo(self, current_acl, perm, scope, value=None): |
- matches = list(self._YieldMatchingEntriesJson(current_acl, perm, scope, |
- value)) |
- self.assertEqual(0, len(matches)) |
- |
- def _YieldMatchingEntriesJson(self, current_acl, perm, scope, value=None): |
- """Generator that yields entries that match the change descriptor. |
- |
- Args: |
- current_acl: A list of apitools_messages.BucketAccessControls or |
- ObjectAccessControls which will be searched for matching |
- entries. |
- perm: Role (permission) to match. |
- scope: Scope type to match. |
- value: Value to match (against the scope type). |
- |
- Yields: |
- An apitools_messages.BucketAccessControl or ObjectAccessControl. |
- """ |
- for entry in current_acl: |
- if (scope in ['UserById', 'GroupById'] and |
- entry.entityId and value == entry.entityId and |
- entry.role == perm): |
- yield entry |
- elif (scope in ['UserByEmail', 'GroupByEmail'] and |
- entry.email and value == entry.email and |
- entry.role == perm): |
- yield entry |
- elif (scope == 'GroupByDomain' and |
- entry.domain and value == entry.domain and |
- entry.role == perm): |
- yield entry |
- elif (scope == 'Project' and entry.role == perm and |
- value == entry.entityId): |
- yield entry |
- elif (scope in ['AllUsers', 'AllAuthenticatedUsers'] and |
- entry.entity.lower() == scope.lower() and |
- entry.role == perm): |
- yield entry |
- |
- def _MakeScopeRegex(self, role, entity_type, email_address): |
- template_regex = (r'\{.*"entity":\s*"%s-%s".*"role":\s*"%s".*\}' % |
- (entity_type, email_address, role)) |
- return re.compile(template_regex, flags=re.DOTALL) |
- |
- def _MakeProjectScopeRegex(self, role, project_team): |
- template_regex = (r'\{.*"entity":\s*"project-%s-\d+",\s*"projectTeam":\s*' |
- r'\{\s*"projectNumber":\s*"(\d+)",\s*"team":\s*"%s"\s*\},' |
- r'\s*"role":\s*"%s".*\}') % (project_team, project_team, |
- role) |
- |
- return re.compile(template_regex, flags=re.DOTALL) |
- |
- def testBucketAclChange(self): |
- """Tests acl change on a bucket.""" |
- test_regex = self._MakeScopeRegex( |
- 'OWNER', 'user', self.USER_TEST_ADDRESS) |
- json_text = self.RunGsUtil( |
- self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True) |
- self.assertNotRegexpMatches(json_text, test_regex) |
- |
- self.RunGsUtil(self._ch_acl_prefix + |
- ['-u', self.USER_TEST_ADDRESS+':fc', suri(self.sample_uri)]) |
- json_text = self.RunGsUtil( |
- self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True) |
- self.assertRegexpMatches(json_text, test_regex) |
- |
- test_regex2 = self._MakeScopeRegex( |
- 'WRITER', 'user', self.USER_TEST_ADDRESS) |
- self.RunGsUtil(self._ch_acl_prefix + |
- ['-u', self.USER_TEST_ADDRESS+':w', suri(self.sample_uri)]) |
- json_text2 = self.RunGsUtil( |
- self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True) |
- self.assertRegexpMatches(json_text2, test_regex2) |
- |
- self.RunGsUtil(self._ch_acl_prefix + |
- ['-d', self.USER_TEST_ADDRESS, suri(self.sample_uri)]) |
- |
- json_text3 = self.RunGsUtil( |
- self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True) |
- self.assertNotRegexpMatches(json_text3, test_regex) |
- |
- def testProjectAclChangesOnBucket(self): |
- """Tests project entity acl changes on a bucket.""" |
- |
- if self.test_api == ApiSelector.XML: |
- stderr = self.RunGsUtil(self._ch_acl_prefix + |
- ['-p', self._project_test_acl +':w', |
- suri(self.sample_uri)], |
- expected_status=1, |
- return_stderr=True) |
- self.assertIn(('CommandException: XML API does not support project' |
- ' scopes, cannot translate ACL.'), stderr) |
- else: |
- test_regex = self._MakeProjectScopeRegex( |
- 'WRITER', self._project_team) |
- self.RunGsUtil(self._ch_acl_prefix + |
- ['-p', self._project_test_acl +':w', |
- suri(self.sample_uri)]) |
- json_text = self.RunGsUtil( |
- self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True) |
- |
- self.assertRegexpMatches(json_text, test_regex) |
- |
- # The api will accept string project ids, but stores the numeric project |
- # ids internally, this extracts the numeric id from the returned acls. |
- proj_num_id = test_regex.search(json_text).group(1) |
- acl_to_remove = '%s-%s' % (self._project_team, proj_num_id) |
- |
- self.RunGsUtil(self._ch_acl_prefix + |
- ['-d', acl_to_remove, suri(self.sample_uri)]) |
- |
- json_text2 = self.RunGsUtil( |
- self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True) |
- self.assertNotRegexpMatches(json_text2, test_regex) |
- |
- def testObjectAclChange(self): |
- """Tests acl change on an object.""" |
- obj = self.CreateObject(bucket_uri=self.sample_uri, contents='something') |
- self.AssertNObjectsInBucket(self.sample_uri, 1) |
- |
- test_regex = self._MakeScopeRegex( |
- 'READER', 'group', self.GROUP_TEST_ADDRESS) |
- json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
- return_stdout=True) |
- self.assertNotRegexpMatches(json_text, test_regex) |
- |
- self.RunGsUtil(self._ch_acl_prefix + |
- ['-g', self.GROUP_TEST_ADDRESS+':READ', suri(obj)]) |
- json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
- return_stdout=True) |
- self.assertRegexpMatches(json_text, test_regex) |
- |
- test_regex2 = self._MakeScopeRegex( |
- 'OWNER', 'group', self.GROUP_TEST_ADDRESS) |
- self.RunGsUtil(self._ch_acl_prefix + |
- ['-g', self.GROUP_TEST_ADDRESS+':OWNER', suri(obj)]) |
- json_text2 = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
- return_stdout=True) |
- self.assertRegexpMatches(json_text2, test_regex2) |
- |
- self.RunGsUtil(self._ch_acl_prefix + |
- ['-d', self.GROUP_TEST_ADDRESS, suri(obj)]) |
- json_text3 = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
- return_stdout=True) |
- self.assertNotRegexpMatches(json_text3, test_regex2) |
- |
- all_auth_regex = re.compile( |
- r'\{.*"entity":\s*"allAuthenticatedUsers".*"role":\s*"OWNER".*\}', |
- flags=re.DOTALL) |
- |
- self.RunGsUtil(self._ch_acl_prefix + ['-g', 'AllAuth:O', suri(obj)]) |
- json_text4 = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
- return_stdout=True) |
- self.assertRegexpMatches(json_text4, all_auth_regex) |
- |
- def testObjectAclChangeAllUsers(self): |
- """Tests acl ch AllUsers:R on an object.""" |
- obj = self.CreateObject(bucket_uri=self.sample_uri, contents='something') |
- self.AssertNObjectsInBucket(self.sample_uri, 1) |
- |
- all_users_regex = re.compile( |
- r'\{.*"entity":\s*"allUsers".*"role":\s*"READER".*\}', flags=re.DOTALL) |
- json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
- return_stdout=True) |
- self.assertNotRegexpMatches(json_text, all_users_regex) |
- |
- self.RunGsUtil(self._ch_acl_prefix + |
- ['-g', 'AllUsers:R', suri(obj)]) |
- json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
- return_stdout=True) |
- self.assertRegexpMatches(json_text, all_users_regex) |
- |
- def testMultithreadedAclChange(self, count=10): |
- """Tests multi-threaded acl changing on several objects.""" |
- objects = [] |
- for i in range(count): |
- objects.append(self.CreateObject( |
- bucket_uri=self.sample_uri, |
- contents='something {0}'.format(i))) |
- |
- self.AssertNObjectsInBucket(self.sample_uri, count) |
- |
- test_regex = self._MakeScopeRegex( |
- 'READER', 'group', self.GROUP_TEST_ADDRESS) |
- json_texts = [] |
- for obj in objects: |
- json_texts.append(self.RunGsUtil( |
- self._get_acl_prefix + [suri(obj)], return_stdout=True)) |
- for json_text in json_texts: |
- self.assertNotRegexpMatches(json_text, test_regex) |
- |
- uris = [suri(obj) for obj in objects] |
- self.RunGsUtil(['-m', '-DD'] + self._ch_acl_prefix + |
- ['-g', self.GROUP_TEST_ADDRESS+':READ'] + uris) |
- |
- json_texts = [] |
- for obj in objects: |
- json_texts.append(self.RunGsUtil( |
- self._get_acl_prefix + [suri(obj)], return_stdout=True)) |
- for json_text in json_texts: |
- self.assertRegexpMatches(json_text, test_regex) |
- |
- def testRecursiveChangeAcl(self): |
- """Tests recursively changing ACLs on nested objects.""" |
- obj = self.CreateObject(bucket_uri=self.sample_uri, object_name='foo/bar', |
- contents='something') |
- self.AssertNObjectsInBucket(self.sample_uri, 1) |
- |
- test_regex = self._MakeScopeRegex( |
- 'READER', 'group', self.GROUP_TEST_ADDRESS) |
- json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
- return_stdout=True) |
- self.assertNotRegexpMatches(json_text, test_regex) |
- |
- @Retry(AssertionError, tries=5, timeout_secs=1) |
- def _AddAcl(): |
- self.RunGsUtil( |
- self._ch_acl_prefix + |
- ['-R', '-g', self.GROUP_TEST_ADDRESS+':READ', suri(obj)[:-3]]) |
- json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
- return_stdout=True) |
- self.assertRegexpMatches(json_text, test_regex) |
- _AddAcl() |
- |
- @Retry(AssertionError, tries=5, timeout_secs=1) |
- def _DeleteAcl(): |
- self.RunGsUtil(self._ch_acl_prefix + |
- ['-d', self.GROUP_TEST_ADDRESS, suri(obj)]) |
- json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], |
- return_stdout=True) |
- self.assertNotRegexpMatches(json_text, test_regex) |
- _DeleteAcl() |
- |
- def testMultiVersionSupport(self): |
- """Tests changing ACLs on multiple object versions.""" |
- bucket = self.CreateVersionedBucket() |
- object_name = self.MakeTempName('obj') |
- self.CreateObject( |
- bucket_uri=bucket, object_name=object_name, contents='One thing') |
- # Create another on the same URI, giving us a second version. |
- self.CreateObject( |
- bucket_uri=bucket, object_name=object_name, contents='Another thing') |
- |
- lines = self.AssertNObjectsInBucket(bucket, 2, versioned=True) |
- |
- obj_v1, obj_v2 = lines[0], lines[1] |
- |
- test_regex = self._MakeScopeRegex( |
- 'READER', 'group', self.GROUP_TEST_ADDRESS) |
- json_text = self.RunGsUtil(self._get_acl_prefix + [obj_v1], |
- return_stdout=True) |
- self.assertNotRegexpMatches(json_text, test_regex) |
- |
- self.RunGsUtil(self._ch_acl_prefix + |
- ['-g', self.GROUP_TEST_ADDRESS+':READ', obj_v1]) |
- json_text = self.RunGsUtil(self._get_acl_prefix + [obj_v1], |
- return_stdout=True) |
- self.assertRegexpMatches(json_text, test_regex) |
- |
- json_text = self.RunGsUtil(self._get_acl_prefix + [obj_v2], |
- return_stdout=True) |
- self.assertNotRegexpMatches(json_text, test_regex) |
- |
- def testBadRequestAclChange(self): |
- stdout, stderr = self.RunGsUtil( |
- self._ch_acl_prefix + |
- ['-u', 'invalid_$$@hello.com:R', suri(self.sample_uri)], |
- return_stdout=True, return_stderr=True, expected_status=1) |
- self.assertIn('BadRequestException', stderr) |
- self.assertNotIn('Retrying', stdout) |
- self.assertNotIn('Retrying', stderr) |
- |
- def testAclGetWithoutFullControl(self): |
- object_uri = self.CreateObject(contents='foo') |
- with self.SetAnonymousBotoCreds(): |
- stderr = self.RunGsUtil(self._get_acl_prefix + [suri(object_uri)], |
- return_stderr=True, expected_status=1) |
- self.assertIn('AccessDeniedException', stderr) |
- |
- def testTooFewArgumentsFails(self): |
- """Tests calling ACL commands with insufficient number of arguments.""" |
- # No arguments for get, but valid subcommand. |
- stderr = self.RunGsUtil(self._get_acl_prefix, return_stderr=True, |
- expected_status=1) |
- self.assertIn('command requires at least', stderr) |
- |
- # No arguments for set, but valid subcommand. |
- stderr = self.RunGsUtil(self._set_acl_prefix, return_stderr=True, |
- expected_status=1) |
- self.assertIn('command requires at least', stderr) |
- |
- # No arguments for ch, but valid subcommand. |
- stderr = self.RunGsUtil(self._ch_acl_prefix, return_stderr=True, |
- expected_status=1) |
- self.assertIn('command requires at least', stderr) |
- |
- # Neither arguments nor subcommand. |
- stderr = self.RunGsUtil(['acl'], return_stderr=True, expected_status=1) |
- self.assertIn('command requires at least', stderr) |
- |
- def testMinusF(self): |
- """Tests -f option to continue after failure.""" |
- bucket_uri = self.CreateBucket() |
- obj_uri = suri(self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
- contents='foo')) |
- acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri], |
- return_stdout=True) |
- self.RunGsUtil(self._set_acl_prefix + |
- ['-f', 'public-read', suri(bucket_uri) + 'foo2', obj_uri], |
- expected_status=1) |
- acl_string2 = self.RunGsUtil(self._get_acl_prefix + [obj_uri], |
- return_stdout=True) |
- |
- self.assertNotEqual(acl_string, acl_string2) |
- |
- |
-class TestS3CompatibleAcl(TestAclBase): |
- """ACL integration tests that work for s3 and gs URLs.""" |
- |
- def testAclObjectGetSet(self): |
- bucket_uri = self.CreateBucket() |
- obj_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo') |
- self.AssertNObjectsInBucket(bucket_uri, 1) |
- |
- stdout = self.RunGsUtil(self._get_acl_prefix + [suri(obj_uri)], |
- return_stdout=True) |
- set_contents = self.CreateTempFile(contents=stdout) |
- self.RunGsUtil(self._set_acl_prefix + [set_contents, suri(obj_uri)]) |
- |
- def testAclBucketGetSet(self): |
- bucket_uri = self.CreateBucket() |
- stdout = self.RunGsUtil(self._get_acl_prefix + [suri(bucket_uri)], |
- return_stdout=True) |
- set_contents = self.CreateTempFile(contents=stdout) |
- self.RunGsUtil(self._set_acl_prefix + [set_contents, suri(bucket_uri)]) |
- |
- |
-@SkipForGS('S3 ACLs accept XML and should not cause an XML warning.') |
-class TestS3OnlyAcl(TestAclBase): |
- """ACL integration tests that work only for s3 URLs.""" |
- |
- # TODO: Format all test case names consistently. |
- def test_set_xml_acl(self): |
- """Ensures XML content does not return an XML warning for S3.""" |
- obj_uri = suri(self.CreateObject(contents='foo')) |
- inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>') |
- stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri], |
- return_stderr=True, expected_status=1) |
- self.assertIn('BadRequestException', stderr) |
- self.assertNotIn('XML ACL data provided', stderr) |
- |
- def test_set_xml_acl_bucket(self): |
- """Ensures XML content does not return an XML warning for S3.""" |
- bucket_uri = suri(self.CreateBucket()) |
- inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>') |
- stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri], |
- return_stderr=True, expected_status=1) |
- self.assertIn('BadRequestException', stderr) |
- self.assertNotIn('XML ACL data provided', stderr) |
- |
- |
-class TestAclOldAlias(TestAcl): |
- _set_acl_prefix = ['setacl'] |
- _get_acl_prefix = ['getacl'] |
- _set_defacl_prefix = ['setdefacl'] |
- _ch_acl_prefix = ['chacl'] |