Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(383)

Unified Diff: gslib/tests/test_acl.py

Issue 698893003: Update checked in version of gsutil to version 4.6 (Closed) Base URL: http://dart.googlecode.com/svn/third_party/gsutil/
Patch Set: Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « gslib/tests/test_Doption.py ('k') | gslib/tests/test_bucketconfig.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: gslib/tests/test_acl.py
===================================================================
--- gslib/tests/test_acl.py (revision 33376)
+++ gslib/tests/test_acl.py (working copy)
@@ -14,51 +14,77 @@
# limitations under the License.
"""Integration tests for the acl command."""
-import gslib.tests.testcase as testcase
+from __future__ import absolute_import
+
import re
from gslib import aclhelpers
from gslib.command import CreateGsutilLogger
+from gslib.storage_url import StorageUrlFromString
+import gslib.tests.testcase as testcase
+from gslib.tests.testcase.integration_testcase import SkipForGS
+from gslib.tests.testcase.integration_testcase import SkipForS3
from gslib.tests.util import ObjectToURI as suri
-from gslib.tests.util import SetBotoConfigForTest
-from gslib.util import Retry
+from gslib.translation_helper import AclTranslation
-PUBLIC_READ_ACL_TEXT = '<Scope type="AllUsers"/><Permission>READ</Permission>'
+PUBLIC_READ_JSON_ACL_TEXT = '"entity":"allUsers","role":"READER"'
-class TestAcl(testcase.GsUtilIntegrationTestCase):
- """Integration tests for acl command."""
+class TestAclBase(testcase.GsUtilIntegrationTestCase):
+ """Integration test case base class for acl command."""
_set_acl_prefix = ['acl', 'set']
_get_acl_prefix = ['acl', 'get']
_set_defacl_prefix = ['defacl', 'set']
_ch_acl_prefix = ['acl', 'ch']
+
+@SkipForS3('Tests use GS ACL model.')
+class TestAcl(TestAclBase):
+ """Integration tests for acl command."""
+
def setUp(self):
super(TestAcl, self).setUp()
self.sample_uri = self.CreateBucket()
+ self.sample_url = StorageUrlFromString(str(self.sample_uri))
self.logger = CreateGsutilLogger('acl')
def test_set_invalid_acl_object(self):
- """Ensures that invalid XML content returns a MalformedACLError."""
+ """Ensures that invalid content returns a bad request error."""
obj_uri = suri(self.CreateObject(contents='foo'))
- inpath = self.CreateTempFile(contents='badXml')
+ inpath = self.CreateTempFile(contents='badAcl')
stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri],
return_stderr=True, expected_status=1)
+ self.assertIn('ArgumentException', stderr)
- self.assertIn('MalformedACLError', stderr)
-
def test_set_invalid_acl_bucket(self):
- """Ensures that invalid XML content returns a MalformedACLError."""
+ """Ensures that invalid content returns a bad request error."""
bucket_uri = suri(self.CreateBucket())
- inpath = self.CreateTempFile(contents='badXml')
+ inpath = self.CreateTempFile(contents='badAcl')
stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri],
return_stderr=True, expected_status=1)
+ self.assertIn('ArgumentException', stderr)
- self.assertIn('MalformedACLError', stderr)
+ def test_set_xml_acl_json_api_object(self):
+ """Ensures XML content returns a bad request error and migration warning."""
+ obj_uri = suri(self.CreateObject(contents='foo'))
+ inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>')
+ stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri],
+ return_stderr=True, expected_status=1)
+ self.assertIn('ArgumentException', stderr)
+ self.assertIn('XML ACL data provided', stderr)
+ def test_set_xml_acl_json_api_bucket(self):
+ """Ensures XML content returns a bad request error and migration warning."""
+ bucket_uri = suri(self.CreateBucket())
+ inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>')
+ stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri],
+ return_stderr=True, expected_status=1)
+ self.assertIn('ArgumentException', stderr)
+ self.assertIn('XML ACL data provided', stderr)
+
def test_set_valid_acl_object(self):
- """Ensures that valid canned and XML ACLs work with get/set."""
+ """Tests setting a valid ACL on an object."""
obj_uri = suri(self.CreateObject(contents='foo'))
acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri],
return_stdout=True)
@@ -74,14 +100,14 @@
self.assertEqual(acl_string, acl_string3)
def test_set_valid_permission_whitespace_object(self):
- """Ensures that whitespace is allowed in <Permission> elements."""
+ """Ensures that whitespace is allowed in role and entity elements."""
obj_uri = suri(self.CreateObject(contents='foo'))
acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri],
return_stdout=True)
- acl_string = re.sub(r'<Permission>', r'<Permission> \n', acl_string)
- acl_string = re.sub(r'</Permission>', r'\n </Permission>', acl_string)
+ acl_string = re.sub(r'"role"', r'"role" \n', acl_string)
+ acl_string = re.sub(r'"entity"', r'\n "entity"', acl_string)
inpath = self.CreateTempFile(contents=acl_string)
-
+
self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri])
def test_set_valid_acl_bucket(self):
@@ -100,34 +126,12 @@
self.assertNotEqual(acl_string, acl_string2)
self.assertEqual(acl_string, acl_string3)
- def test_get_and_set_valid_object_acl_with_non_ascii_chars(self):
- """Ensures that non-ASCII chars work correctly in ACL handling."""
- acl_entry_str = """
- <Entry>
- <Scope type="UserByEmail">
- <EmailAddress>gs-team@google.com</EmailAddress>
- <Name>%s</Name>
- </Scope>
- <Permission>READ</Permission>
- </Entry>
-"""
- non_ascii_name = u'Test NonAscii łرح안'
- acl_entry_str = acl_entry_str % non_ascii_name
- obj_uri = self.CreateObject(contents='foo')
- stdout = self.RunGsUtil(self._get_acl_prefix + [suri(obj_uri)],
- return_stdout=True)
- new_acl_str = re.sub('</Entries>', '%s</Entries>' % acl_entry_str, stdout)
- acl_path = self.CreateTempFile(contents=new_acl_str.encode('utf-8'))
- self.RunGsUtil(self._set_acl_prefix + [acl_path, suri(obj_uri)])
- res_acl_str = self.RunGsUtil(self._get_acl_prefix + [suri(obj_uri)],
- return_stdout=True)
- self.assertIn(non_ascii_name.encode('utf-8'), res_acl_str)
-
def test_invalid_canned_acl_object(self):
"""Ensures that an invalid canned ACL returns a CommandException."""
obj_uri = suri(self.CreateObject(contents='foo'))
- stderr = self.RunGsUtil(self._set_acl_prefix + ['not-a-canned-acl',
- obj_uri], return_stderr=True, expected_status=1)
+ stderr = self.RunGsUtil(
+ self._set_acl_prefix + ['not-a-canned-acl', obj_uri],
+ return_stderr=True, expected_status=1)
self.assertIn('CommandException', stderr)
self.assertIn('Invalid canned ACL', stderr)
@@ -155,10 +159,11 @@
return_stdout=True)
self.assertNotEqual(acl_string, acl_string2)
- self.assertIn('AllAuthenticatedUsers', acl_string2)
+ self.assertIn('allAuthenticatedUsers', acl_string2)
self.assertEqual(acl_string, acl_string3)
def test_acl_set_version_specific_uri(self):
+ """Tests setting an ACL on a specific version of an object."""
bucket_uri = self.CreateVersionedBucket()
# Create initial object version.
uri = self.CreateObject(bucket_uri=bucket_uri, contents='data')
@@ -167,23 +172,17 @@
self.RunGsUtil(['cp', inpath, uri.uri])
# Find out the two object version IDs.
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _GetVersions():
- stdout = self.RunGsUtil(['ls', '-a', uri.uri], return_stdout=True)
- lines = stdout.split('\n')
- # There should be 3 lines, counting final \n.
- self.assertEqual(len(lines), 3)
- return lines[0], lines[1]
+ lines = self.AssertNObjectsInBucket(bucket_uri, 2, versioned=True)
+ v0_uri_str, v1_uri_str = lines[0], lines[1]
- v0_uri_str, v1_uri_str = _GetVersions()
-
# Check that neither version currently has public-read permission
# (default ACL is project-private).
orig_acls = []
for uri_str in (v0_uri_str, v1_uri_str):
- acl = self.RunGsUtil(self._get_acl_prefix + [uri_str], return_stdout=True)
- self.assertNotIn(PUBLIC_READ_ACL_TEXT, self._strip_xml_whitespace(acl))
+ acl = self.RunGsUtil(self._get_acl_prefix + [uri_str],
+ return_stdout=True)
+ self.assertNotIn(PUBLIC_READ_JSON_ACL_TEXT,
+ self._strip_json_whitespace(acl))
orig_acls.append(acl)
# Set the ACL for the older version of the object to public-read.
@@ -192,261 +191,333 @@
# is not.
acl = self.RunGsUtil(self._get_acl_prefix + [v0_uri_str],
return_stdout=True)
- self.assertIn(PUBLIC_READ_ACL_TEXT, self._strip_xml_whitespace(acl))
+ self.assertIn(PUBLIC_READ_JSON_ACL_TEXT, self._strip_json_whitespace(acl))
acl = self.RunGsUtil(self._get_acl_prefix + [v1_uri_str],
return_stdout=True)
- self.assertNotIn(PUBLIC_READ_ACL_TEXT, self._strip_xml_whitespace(acl))
+ self.assertNotIn(PUBLIC_READ_JSON_ACL_TEXT,
+ self._strip_json_whitespace(acl))
# Check that reading the ACL with the version-less URI returns the
# original ACL (since the version-less URI means the current version).
acl = self.RunGsUtil(self._get_acl_prefix + [uri.uri], return_stdout=True)
self.assertEqual(acl, orig_acls[0])
- def _strip_xml_whitespace(self, xml):
- s = re.sub('>\s*', '>', xml.replace('\n', ''))
- return re.sub('\s*<', '<', s)
-
+ def _strip_json_whitespace(self, json_text):
+ return re.sub(r'\s*', '', json_text)
+
def testAclChangeWithUserId(self):
change = aclhelpers.AclChange(self.USER_TEST_ID + ':r',
scope_type=aclhelpers.ChangeType.USER)
- acl = self.sample_uri.get_acl()
- change.Execute(self.sample_uri, acl, self.logger)
- self._AssertHas(acl, 'READ', 'UserById', self.USER_TEST_ID)
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
+ change.Execute(self.sample_url, acl, 'acl', self.logger)
+ self._AssertHas(acl, 'READER', 'UserById', self.USER_TEST_ID)
def testAclChangeWithGroupId(self):
change = aclhelpers.AclChange(self.GROUP_TEST_ID + ':r',
scope_type=aclhelpers.ChangeType.GROUP)
- acl = self.sample_uri.get_acl()
- change.Execute(self.sample_uri, acl, self.logger)
- self._AssertHas(acl, 'READ', 'GroupById', self.GROUP_TEST_ID)
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
+ change.Execute(self.sample_url, acl, 'acl', self.logger)
+ self._AssertHas(acl, 'READER', 'GroupById', self.GROUP_TEST_ID)
def testAclChangeWithUserEmail(self):
change = aclhelpers.AclChange(self.USER_TEST_ADDRESS + ':r',
scope_type=aclhelpers.ChangeType.USER)
- acl = self.sample_uri.get_acl()
- change.Execute(self.sample_uri, acl, self.logger)
- self._AssertHas(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS)
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
+ change.Execute(self.sample_url, acl, 'acl', self.logger)
+ self._AssertHas(acl, 'READER', 'UserByEmail', self.USER_TEST_ADDRESS)
def testAclChangeWithGroupEmail(self):
change = aclhelpers.AclChange(self.GROUP_TEST_ADDRESS + ':fc',
scope_type=aclhelpers.ChangeType.GROUP)
- acl = self.sample_uri.get_acl()
- change.Execute(self.sample_uri, acl, self.logger)
- self._AssertHas(acl, 'FULL_CONTROL', 'GroupByEmail',
- self.GROUP_TEST_ADDRESS)
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
+ change.Execute(self.sample_url, acl, 'acl', self.logger)
+ self._AssertHas(acl, 'OWNER', 'GroupByEmail', self.GROUP_TEST_ADDRESS)
def testAclChangeWithDomain(self):
change = aclhelpers.AclChange(self.DOMAIN_TEST + ':READ',
scope_type=aclhelpers.ChangeType.GROUP)
- acl = self.sample_uri.get_acl()
- change.Execute(self.sample_uri, acl, self.logger)
- self._AssertHas(acl, 'READ', 'GroupByDomain', self.DOMAIN_TEST)
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
+ change.Execute(self.sample_url, acl, 'acl', self.logger)
+ self._AssertHas(acl, 'READER', 'GroupByDomain', self.DOMAIN_TEST)
def testAclChangeWithAllUsers(self):
change = aclhelpers.AclChange('AllUsers:WRITE',
scope_type=aclhelpers.ChangeType.GROUP)
- acl = self.sample_uri.get_acl()
- change.Execute(self.sample_uri, acl, self.logger)
- self._AssertHas(acl, 'WRITE', 'AllUsers')
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
+ change.Execute(self.sample_url, acl, 'acl', self.logger)
+ self._AssertHas(acl, 'WRITER', 'AllUsers')
def testAclChangeWithAllAuthUsers(self):
change = aclhelpers.AclChange('AllAuthenticatedUsers:READ',
scope_type=aclhelpers.ChangeType.GROUP)
- acl = self.sample_uri.get_acl()
- change.Execute(self.sample_uri, acl, self.logger)
- self._AssertHas(acl, 'READ', 'AllAuthenticatedUsers')
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
+ change.Execute(self.sample_url, acl, 'acl', self.logger)
+ self._AssertHas(acl, 'READER', 'AllAuthenticatedUsers')
remove = aclhelpers.AclDel('AllAuthenticatedUsers')
- remove.Execute(self.sample_uri, acl, self.logger)
- self._AssertHasNo(acl, 'READ', 'AllAuthenticatedUsers')
+ remove.Execute(self.sample_url, acl, 'acl', self.logger)
+ self._AssertHasNo(acl, 'READER', 'AllAuthenticatedUsers')
def testAclDelWithUser(self):
add = aclhelpers.AclChange(self.USER_TEST_ADDRESS + ':READ',
scope_type=aclhelpers.ChangeType.USER)
- acl = self.sample_uri.get_acl()
- add.Execute(self.sample_uri, acl, self.logger)
- self._AssertHas(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS)
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
+ add.Execute(self.sample_url, acl, 'acl', self.logger)
+ self._AssertHas(acl, 'READER', 'UserByEmail', self.USER_TEST_ADDRESS)
remove = aclhelpers.AclDel(self.USER_TEST_ADDRESS)
- remove.Execute(self.sample_uri, acl, self.logger)
+ remove.Execute(self.sample_url, acl, 'acl', self.logger)
self._AssertHasNo(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS)
def testAclDelWithGroup(self):
add = aclhelpers.AclChange(self.USER_TEST_ADDRESS + ':READ',
scope_type=aclhelpers.ChangeType.GROUP)
- acl = self.sample_uri.get_acl()
- add.Execute(self.sample_uri, acl, self.logger)
- self._AssertHas(acl, 'READ', 'GroupByEmail', self.USER_TEST_ADDRESS)
+ acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
+ add.Execute(self.sample_url, acl, 'acl', self.logger)
+ self._AssertHas(acl, 'READER', 'GroupByEmail', self.USER_TEST_ADDRESS)
remove = aclhelpers.AclDel(self.USER_TEST_ADDRESS)
- remove.Execute(self.sample_uri, acl, self.logger)
- self._AssertHasNo(acl, 'READ', 'GroupByEmail', self.GROUP_TEST_ADDRESS)
+ remove.Execute(self.sample_url, acl, 'acl', self.logger)
+ self._AssertHasNo(acl, 'READER', 'GroupByEmail', self.GROUP_TEST_ADDRESS)
#
# Here are a whole lot of verbose asserts
#
def _AssertHas(self, current_acl, perm, scope, value=None):
- matches = list(self._YieldMatchingEntries(current_acl, perm, scope, value))
+ matches = list(self._YieldMatchingEntriesJson(current_acl, perm, scope,
+ value))
self.assertEqual(1, len(matches))
def _AssertHasNo(self, current_acl, perm, scope, value=None):
- matches = list(self._YieldMatchingEntries(current_acl, perm, scope, value))
+ matches = list(self._YieldMatchingEntriesJson(current_acl, perm, scope,
+ value))
self.assertEqual(0, len(matches))
- def _YieldMatchingEntries(self, current_acl, perm, scope, value=None):
- """Generator that finds entries that match the change descriptor."""
- for entry in current_acl.entries.entry_list:
- if entry.scope.type == scope:
- if scope in ['UserById', 'GroupById']:
- if value == entry.scope.id:
- yield entry
- elif scope in ['UserByEmail', 'GroupByEmail']:
- if value == entry.scope.email_address:
- yield entry
- elif scope == 'GroupByDomain':
- if value == entry.scope.domain:
- yield entry
- elif scope in ['AllUsers', 'AllAuthenticatedUsers']:
- yield entry
- else:
- raise Exception('Found an unrecognized ACL entry type, aborting.')
+ def _YieldMatchingEntriesJson(self, current_acl, perm, scope, value=None):
+ """Generator that yields entries that match the change descriptor.
- def _MakeScopeRegex(self, scope_type, email_address, perm):
- template_regex = (
- r'<Scope type="{0}">\s*<EmailAddress>\s*{1}\s*</EmailAddress>\s*'
- r'</Scope>\s*<Permission>\s*{2}\s*</Permission>')
- return template_regex.format(scope_type, email_address, perm)
+ Args:
+ current_acl: A list of apitools_messages.BucketAccessControls or
+ ObjectAccessControls which will be searched for matching
+ entries.
+ perm: Role (permission) to match.
+ scope: Scope type to match.
+ value: Value to match (against the scope type).
+ Yields:
+ An apitools_messages.BucketAccessControl or ObjectAccessControl.
+ """
+ for entry in current_acl:
+ if (scope in ['UserById', 'GroupById'] and
+ entry.entityId and value == entry.entityId and
+ entry.role == perm):
+ yield entry
+ elif (scope in ['UserByEmail', 'GroupByEmail'] and
+ entry.email and value == entry.email and
+ entry.role == perm):
+ yield entry
+ elif (scope == 'GroupByDomain' and
+ entry.domain and value == entry.domain and
+ entry.role == perm):
+ yield entry
+ elif (scope in ['AllUsers', 'AllAuthenticatedUsers'] and
+ entry.entity.lower() == scope.lower() and
+ entry.role == perm):
+ yield entry
+
+ def _MakeScopeRegex(self, role, entity_type, email_address):
+ template_regex = (r'\{.*"entity":\s*"%s-%s".*"role":\s*"%s".*\}' %
+ (entity_type, email_address, role))
+ return re.compile(template_regex, flags=re.DOTALL)
+
def testBucketAclChange(self):
+ """Tests acl change on a bucket."""
test_regex = self._MakeScopeRegex(
- 'UserByEmail', self.USER_TEST_ADDRESS, 'FULL_CONTROL')
- xml = self.RunGsUtil(
+ 'OWNER', 'user', self.USER_TEST_ADDRESS)
+ json_text = self.RunGsUtil(
self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True)
- self.assertNotRegexpMatches(xml, test_regex)
+ self.assertNotRegexpMatches(json_text, test_regex)
self.RunGsUtil(self._ch_acl_prefix +
['-u', self.USER_TEST_ADDRESS+':fc', suri(self.sample_uri)])
- xml = self.RunGsUtil(
+ json_text = self.RunGsUtil(
self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True)
- self.assertRegexpMatches(xml, test_regex)
+ self.assertRegexpMatches(json_text, test_regex)
+ test_regex2 = self._MakeScopeRegex(
+ 'WRITER', 'user', self.USER_TEST_ADDRESS)
self.RunGsUtil(self._ch_acl_prefix +
+ ['-u', self.USER_TEST_ADDRESS+':w', suri(self.sample_uri)])
+ json_text2 = self.RunGsUtil(
+ self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True)
+ self.assertRegexpMatches(json_text2, test_regex2)
+
+ self.RunGsUtil(self._ch_acl_prefix +
['-d', self.USER_TEST_ADDRESS, suri(self.sample_uri)])
- xml = self.RunGsUtil(
+ json_text3 = self.RunGsUtil(
self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True)
- self.assertNotRegexpMatches(xml, test_regex)
+ self.assertNotRegexpMatches(json_text3, test_regex)
def testObjectAclChange(self):
+ """Tests acl change on an object."""
obj = self.CreateObject(bucket_uri=self.sample_uri, contents='something')
+ self.AssertNObjectsInBucket(self.sample_uri, 1)
+
test_regex = self._MakeScopeRegex(
- 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ')
- xml = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], return_stdout=True)
- self.assertNotRegexpMatches(xml, test_regex)
+ 'READER', 'group', self.GROUP_TEST_ADDRESS)
+ json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
+ return_stdout=True)
+ self.assertNotRegexpMatches(json_text, test_regex)
self.RunGsUtil(self._ch_acl_prefix +
['-g', self.GROUP_TEST_ADDRESS+':READ', suri(obj)])
- xml = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], return_stdout=True)
- self.assertRegexpMatches(xml, test_regex)
+ json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
+ return_stdout=True)
+ self.assertRegexpMatches(json_text, test_regex)
+ test_regex2 = self._MakeScopeRegex(
+ 'OWNER', 'group', self.GROUP_TEST_ADDRESS)
self.RunGsUtil(self._ch_acl_prefix +
+ ['-g', self.GROUP_TEST_ADDRESS+':OWNER', suri(obj)])
+ json_text2 = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
+ return_stdout=True)
+ self.assertRegexpMatches(json_text2, test_regex2)
+
+ self.RunGsUtil(self._ch_acl_prefix +
['-d', self.GROUP_TEST_ADDRESS, suri(obj)])
- xml = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], return_stdout=True)
- self.assertNotRegexpMatches(xml, test_regex)
+ json_text3 = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
+ return_stdout=True)
+ self.assertNotRegexpMatches(json_text3, test_regex2)
+ all_auth_regex = re.compile(
+ r'\{.*"entity":\s*"allAuthenticatedUsers".*"role":\s*"OWNER".*\}',
+ flags=re.DOTALL)
+
+ self.RunGsUtil(self._ch_acl_prefix + ['-g', 'AllAuth:O', suri(obj)])
+ json_text4 = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
+ return_stdout=True)
+ self.assertRegexpMatches(json_text4, all_auth_regex)
+
+ def testObjectAclChangeAllUsers(self):
+ """Tests acl ch AllUsers:R on an object."""
+ obj = self.CreateObject(bucket_uri=self.sample_uri, contents='something')
+ self.AssertNObjectsInBucket(self.sample_uri, 1)
+
+ all_users_regex = re.compile(
+ r'\{.*"entity":\s*"allUsers".*"role":\s*"READER".*\}', flags=re.DOTALL)
+ json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
+ return_stdout=True)
+ self.assertNotRegexpMatches(json_text, all_users_regex)
+
+ self.RunGsUtil(self._ch_acl_prefix +
+ ['-g', 'AllUsers:R', suri(obj)])
+ json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
+ return_stdout=True)
+ self.assertRegexpMatches(json_text, all_users_regex)
+
def testMultithreadedAclChange(self, count=10):
+ """Tests multi-threaded acl changing on several objects."""
objects = []
for i in range(count):
objects.append(self.CreateObject(
bucket_uri=self.sample_uri,
contents='something {0}'.format(i)))
+ self.AssertNObjectsInBucket(self.sample_uri, count)
+
test_regex = self._MakeScopeRegex(
- 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ')
- xmls = []
+ 'READER', 'group', self.GROUP_TEST_ADDRESS)
+ json_texts = []
for obj in objects:
- xmls.append(self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
- return_stdout=True))
- for xml in xmls:
- self.assertNotRegexpMatches(xml, test_regex)
+ json_texts.append(self.RunGsUtil(
+ self._get_acl_prefix + [suri(obj)], return_stdout=True))
+ for json_text in json_texts:
+ self.assertNotRegexpMatches(json_text, test_regex)
uris = [suri(obj) for obj in objects]
- self.RunGsUtil(['-m', '-DD'] + self._ch_acl_prefix + ['-g',
- self.GROUP_TEST_ADDRESS+':READ'] + uris)
+ self.RunGsUtil(['-m', '-DD'] + self._ch_acl_prefix +
+ ['-g', self.GROUP_TEST_ADDRESS+':READ'] + uris)
- xmls = []
+ json_texts = []
for obj in objects:
- xmls.append(self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
- return_stdout=True))
- for xml in xmls:
- self.assertRegexpMatches(xml, test_regex)
+ json_texts.append(self.RunGsUtil(
+ self._get_acl_prefix + [suri(obj)], return_stdout=True))
+ for json_text in json_texts:
+ self.assertRegexpMatches(json_text, test_regex)
def testRecursiveChangeAcl(self):
+ """Tests recursively changing ACLs on nested objects."""
obj = self.CreateObject(bucket_uri=self.sample_uri, object_name='foo/bar',
contents='something')
+ self.AssertNObjectsInBucket(self.sample_uri, 1)
+
test_regex = self._MakeScopeRegex(
- 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ')
- xml = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], return_stdout=True)
- self.assertNotRegexpMatches(xml, test_regex)
+ 'READER', 'group', self.GROUP_TEST_ADDRESS)
+ json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
+ return_stdout=True)
+ self.assertNotRegexpMatches(json_text, test_regex)
- self.RunGsUtil(self._ch_acl_prefix +
+ self.RunGsUtil(
+ self._ch_acl_prefix +
['-R', '-g', self.GROUP_TEST_ADDRESS+':READ', suri(obj)[:-3]])
- xml = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], return_stdout=True)
- self.assertRegexpMatches(xml, test_regex)
+ json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
+ return_stdout=True)
+ self.assertRegexpMatches(json_text, test_regex)
self.RunGsUtil(self._ch_acl_prefix +
['-d', self.GROUP_TEST_ADDRESS, suri(obj)])
- xml = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], return_stdout=True)
- self.assertNotRegexpMatches(xml, test_regex)
+ json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
+ return_stdout=True)
+ self.assertNotRegexpMatches(json_text, test_regex)
def testMultiVersionSupport(self):
+ """Tests changing ACLs on multiple object versions."""
bucket = self.CreateVersionedBucket()
object_name = self.MakeTempName('obj')
- obj = self.CreateObject(
+ self.CreateObject(
bucket_uri=bucket, object_name=object_name, contents='One thing')
# Create another on the same URI, giving us a second version.
self.CreateObject(
bucket_uri=bucket, object_name=object_name, contents='Another thing')
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1, logger=self.logger)
- def _getObjects():
- stdout = self.RunGsUtil(['ls', '-a', suri(obj)], return_stdout=True)
- lines = stdout.strip().split('\n')
- self.assertEqual(len(lines), 2)
- return lines
+ lines = self.AssertNObjectsInBucket(bucket, 2, versioned=True)
- obj_v1, obj_v2 = _getObjects()
+ obj_v1, obj_v2 = lines[0], lines[1]
test_regex = self._MakeScopeRegex(
- 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ')
- xml = self.RunGsUtil(self._get_acl_prefix + [obj_v1], return_stdout=True)
- self.assertNotRegexpMatches(xml, test_regex)
+ 'READER', 'group', self.GROUP_TEST_ADDRESS)
+ json_text = self.RunGsUtil(self._get_acl_prefix + [obj_v1],
+ return_stdout=True)
+ self.assertNotRegexpMatches(json_text, test_regex)
self.RunGsUtil(self._ch_acl_prefix +
['-g', self.GROUP_TEST_ADDRESS+':READ', obj_v1])
- xml = self.RunGsUtil(self._get_acl_prefix + [obj_v1], return_stdout=True)
- self.assertRegexpMatches(xml, test_regex)
+ json_text = self.RunGsUtil(self._get_acl_prefix + [obj_v1],
+ return_stdout=True)
+ self.assertRegexpMatches(json_text, test_regex)
- xml = self.RunGsUtil(self._get_acl_prefix + [obj_v2], return_stdout=True)
- self.assertNotRegexpMatches(xml, test_regex)
+ json_text = self.RunGsUtil(self._get_acl_prefix + [obj_v2],
+ return_stdout=True)
+ self.assertNotRegexpMatches(json_text, test_regex)
def testBadRequestAclChange(self):
- stdout, stderr = self.RunGsUtil(self._ch_acl_prefix +
+ stdout, stderr = self.RunGsUtil(
+ self._ch_acl_prefix +
['-u', 'invalid_$$@hello.com:R', suri(self.sample_uri)],
return_stdout=True, return_stderr=True, expected_status=1)
- self.assertIn('Bad Request', stderr)
+ self.assertIn('BadRequestException', stderr)
self.assertNotIn('Retrying', stdout)
self.assertNotIn('Retrying', stderr)
-
+
def testAclGetWithoutFullControl(self):
object_uri = self.CreateObject(contents='foo')
with self.SetAnonymousBotoCreds():
- stderr = self.RunGsUtil(['acl', 'get', suri(object_uri)],
- return_stderr = True, expected_status=1)
- self.assertIn('Note that Full Control access is required to access ACLs.',
- stderr)
-
+ stderr = self.RunGsUtil(self._get_acl_prefix + [suri(object_uri)],
+ return_stderr=True, expected_status=1)
+ self.assertIn('AccessDeniedException', stderr)
+
def testTooFewArgumentsFails(self):
+ """Tests calling ACL commands with insufficient number of arguments."""
# No arguments for get, but valid subcommand.
stderr = self.RunGsUtil(self._get_acl_prefix, return_stderr=True,
expected_status=1)
@@ -466,6 +537,67 @@
stderr = self.RunGsUtil(['acl'], return_stderr=True, expected_status=1)
self.assertIn('command requires at least', stderr)
+ def testMinusF(self):
+ """Tests -f option to continue after failure."""
+ bucket_uri = self.CreateBucket()
+ obj_uri = suri(self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
+ contents='foo'))
+ acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri],
+ return_stdout=True)
+ self.RunGsUtil(self._set_acl_prefix +
+ ['-f', 'public-read', suri(bucket_uri) + 'foo2', obj_uri],
+ expected_status=1)
+ acl_string2 = self.RunGsUtil(self._get_acl_prefix + [obj_uri],
+ return_stdout=True)
+
+ self.assertNotEqual(acl_string, acl_string2)
+
+
+class TestS3CompatibleAcl(TestAclBase):
+ """ACL integration tests that work for s3 and gs URLs."""
+
+ def testAclObjectGetSet(self):
+ bucket_uri = self.CreateBucket()
+ obj_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo')
+ self.AssertNObjectsInBucket(bucket_uri, 1)
+
+ stdout = self.RunGsUtil(self._get_acl_prefix + [suri(obj_uri)],
+ return_stdout=True)
+ set_contents = self.CreateTempFile(contents=stdout)
+ self.RunGsUtil(self._set_acl_prefix + [set_contents, suri(obj_uri)])
+
+ def testAclBucketGetSet(self):
+ bucket_uri = self.CreateBucket()
+ stdout = self.RunGsUtil(self._get_acl_prefix + [suri(bucket_uri)],
+ return_stdout=True)
+ set_contents = self.CreateTempFile(contents=stdout)
+ self.RunGsUtil(self._set_acl_prefix + [set_contents, suri(bucket_uri)])
+
+
+@SkipForGS('S3 ACLs accept XML and should not cause an XML warning.')
+class TestS3OnlyAcl(TestAclBase):
+ """ACL integration tests that work only for s3 URLs."""
+
+ # TODO: Format all test case names consistently.
+ def test_set_xml_acl(self):
+ """Ensures XML content does not return an XML warning for S3."""
+ obj_uri = suri(self.CreateObject(contents='foo'))
+ inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>')
+ stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri],
+ return_stderr=True, expected_status=1)
+ self.assertIn('BadRequestException', stderr)
+ self.assertNotIn('XML ACL data provided', stderr)
+
+ def test_set_xml_acl_bucket(self):
+ """Ensures XML content does not return an XML warning for S3."""
+ bucket_uri = suri(self.CreateBucket())
+ inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>')
+ stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri],
+ return_stderr=True, expected_status=1)
+ self.assertIn('BadRequestException', stderr)
+ self.assertNotIn('XML ACL data provided', stderr)
+
+
class TestAclOldAlias(TestAcl):
_set_acl_prefix = ['setacl']
_get_acl_prefix = ['getacl']
« no previous file with comments | « gslib/tests/test_Doption.py ('k') | gslib/tests/test_bucketconfig.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698