Index: third_party/gsutil/gslib/tests/test_defacl.py |
diff --git a/third_party/gsutil/gslib/tests/test_defacl.py b/third_party/gsutil/gslib/tests/test_defacl.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..f9339556fc4572d4cad17415c935b93a128ab02f |
--- /dev/null |
+++ b/third_party/gsutil/gslib/tests/test_defacl.py |
@@ -0,0 +1,187 @@ |
+# -*- coding: utf-8 -*- |
+# Copyright 2013 Google Inc. All Rights Reserved. |
+# |
+# Licensed under the Apache License, Version 2.0 (the "License"); |
+# you may not use this file except in compliance with the License. |
+# You may obtain a copy of the License at |
+# |
+# http://www.apache.org/licenses/LICENSE-2.0 |
+# |
+# Unless required by applicable law or agreed to in writing, software |
+# distributed under the License is distributed on an "AS IS" BASIS, |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+# See the License for the specific language governing permissions and |
+# limitations under the License. |
+"""Integration tests for the defacl command.""" |
+ |
+from __future__ import absolute_import |
+ |
+import os |
+import re |
+import gslib.tests.testcase as case |
+from gslib.tests.testcase.integration_testcase import SkipForS3 |
+from gslib.tests.util import ObjectToURI as suri |
+ |
+PUBLIC_READ_JSON_ACL_TEXT = '"entity":"allUsers","role":"READER"' |
+ |
+ |
+@SkipForS3('S3 does not support default object ACLs.') |
+class TestDefacl(case.GsUtilIntegrationTestCase): |
+ """Integration tests for the defacl command.""" |
+ |
+ _defacl_ch_prefix = ['defacl', 'ch'] |
+ _defacl_get_prefix = ['defacl', 'get'] |
+ _defacl_set_prefix = ['defacl', 'set'] |
+ |
+ def _MakeScopeRegex(self, role, entity_type, email_address): |
+ template_regex = (r'\{.*"entity":\s*"%s-%s".*"role":\s*"%s".*\}' % |
+ (entity_type, email_address, role)) |
+ return re.compile(template_regex, flags=re.DOTALL) |
+ |
+ def testChangeDefaultAcl(self): |
+ """Tests defacl ch.""" |
+ bucket = self.CreateBucket() |
+ |
+ test_regex = self._MakeScopeRegex( |
+ 'OWNER', 'group', self.GROUP_TEST_ADDRESS) |
+ test_regex2 = self._MakeScopeRegex( |
+ 'READER', 'group', self.GROUP_TEST_ADDRESS) |
+ json_text = self.RunGsUtil(self._defacl_get_prefix + |
+ [suri(bucket)], return_stdout=True) |
+ self.assertNotRegexpMatches(json_text, test_regex) |
+ |
+ self.RunGsUtil(self._defacl_ch_prefix + |
+ ['-g', self.GROUP_TEST_ADDRESS+':FC', suri(bucket)]) |
+ json_text2 = self.RunGsUtil(self._defacl_get_prefix + |
+ [suri(bucket)], return_stdout=True) |
+ self.assertRegexpMatches(json_text2, test_regex) |
+ |
+ self.RunGsUtil(self._defacl_ch_prefix + |
+ ['-g', self.GROUP_TEST_ADDRESS+':READ', suri(bucket)]) |
+ json_text3 = self.RunGsUtil(self._defacl_get_prefix + |
+ [suri(bucket)], return_stdout=True) |
+ self.assertRegexpMatches(json_text3, test_regex2) |
+ |
+ stderr = self.RunGsUtil(self._defacl_ch_prefix + |
+ ['-g', self.GROUP_TEST_ADDRESS+':WRITE', |
+ suri(bucket)], |
+ return_stderr=True, expected_status=1) |
+ self.assertIn('WRITER cannot be set as a default object ACL', stderr) |
+ |
+ def testChangeDefaultAclPrivate(self): |
+ bucket = self.CreateBucket() |
+ test_regex = self._MakeScopeRegex( |
+ 'READER', 'group', self.GROUP_TEST_ADDRESS) |
+ self.RunGsUtil(self._defacl_set_prefix + ['private', suri(bucket)]) |
+ json_text = self.RunGsUtil(self._defacl_get_prefix + |
+ [suri(bucket)], return_stdout=True) |
+ self.assertRegexpMatches(json_text, r'\[\]\s*') |
+ |
+ self.RunGsUtil(self._defacl_ch_prefix + |
+ ['-g', self.GROUP_TEST_ADDRESS+':READ', suri(bucket)]) |
+ json_text2 = self.RunGsUtil(self._defacl_get_prefix + |
+ [suri(bucket)], return_stdout=True) |
+ self.assertRegexpMatches(json_text2, test_regex) |
+ |
+ def testChangeMultipleBuckets(self): |
+ """Tests defacl ch on multiple buckets.""" |
+ bucket1 = self.CreateBucket() |
+ bucket2 = self.CreateBucket() |
+ |
+ test_regex = self._MakeScopeRegex( |
+ 'READER', 'group', self.GROUP_TEST_ADDRESS) |
+ json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket1)], |
+ return_stdout=True) |
+ self.assertNotRegexpMatches(json_text, test_regex) |
+ json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket2)], |
+ return_stdout=True) |
+ self.assertNotRegexpMatches(json_text, test_regex) |
+ |
+ self.RunGsUtil(self._defacl_ch_prefix + |
+ ['-g', self.GROUP_TEST_ADDRESS+':READ', |
+ suri(bucket1), suri(bucket2)]) |
+ json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket1)], |
+ return_stdout=True) |
+ self.assertRegexpMatches(json_text, test_regex) |
+ json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket2)], |
+ return_stdout=True) |
+ self.assertRegexpMatches(json_text, test_regex) |
+ |
+ def testChangeMultipleAcls(self): |
+ """Tests defacl ch with multiple ACL entries.""" |
+ bucket = self.CreateBucket() |
+ |
+ test_regex_group = self._MakeScopeRegex( |
+ 'READER', 'group', self.GROUP_TEST_ADDRESS) |
+ test_regex_user = self._MakeScopeRegex( |
+ 'OWNER', 'user', self.USER_TEST_ADDRESS) |
+ json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)], |
+ return_stdout=True) |
+ self.assertNotRegexpMatches(json_text, test_regex_group) |
+ self.assertNotRegexpMatches(json_text, test_regex_user) |
+ |
+ self.RunGsUtil(self._defacl_ch_prefix + |
+ ['-g', self.GROUP_TEST_ADDRESS+':READ', |
+ '-u', self.USER_TEST_ADDRESS+':fc', suri(bucket)]) |
+ json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)], |
+ return_stdout=True) |
+ self.assertRegexpMatches(json_text, test_regex_group) |
+ self.assertRegexpMatches(json_text, test_regex_user) |
+ |
+ def testEmptyDefAcl(self): |
+ bucket = self.CreateBucket() |
+ self.RunGsUtil(self._defacl_set_prefix + ['private', suri(bucket)]) |
+ stdout = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)], |
+ return_stdout=True) |
+ self.assertEquals(stdout.rstrip(), '[]') |
+ self.RunGsUtil(self._defacl_ch_prefix + |
+ ['-u', self.USER_TEST_ADDRESS+':fc', suri(bucket)]) |
+ |
+ def testDeletePermissionsWithCh(self): |
+ """Tests removing permissions with defacl ch.""" |
+ bucket = self.CreateBucket() |
+ |
+ test_regex = self._MakeScopeRegex( |
+ 'OWNER', 'user', self.USER_TEST_ADDRESS) |
+ json_text = self.RunGsUtil( |
+ self._defacl_get_prefix + [suri(bucket)], return_stdout=True) |
+ self.assertNotRegexpMatches(json_text, test_regex) |
+ |
+ self.RunGsUtil(self._defacl_ch_prefix + |
+ ['-u', self.USER_TEST_ADDRESS+':fc', suri(bucket)]) |
+ json_text = self.RunGsUtil( |
+ self._defacl_get_prefix + [suri(bucket)], return_stdout=True) |
+ self.assertRegexpMatches(json_text, test_regex) |
+ |
+ self.RunGsUtil(self._defacl_ch_prefix + |
+ ['-d', self.USER_TEST_ADDRESS, suri(bucket)]) |
+ json_text = self.RunGsUtil( |
+ self._defacl_get_prefix + [suri(bucket)], return_stdout=True) |
+ self.assertNotRegexpMatches(json_text, test_regex) |
+ |
+ def testTooFewArgumentsFails(self): |
+ """Tests calling defacl with insufficient number of arguments.""" |
+ # No arguments for get, but valid subcommand. |
+ stderr = self.RunGsUtil(self._defacl_get_prefix, return_stderr=True, |
+ expected_status=1) |
+ self.assertIn('command requires at least', stderr) |
+ |
+ # No arguments for set, but valid subcommand. |
+ stderr = self.RunGsUtil(self._defacl_set_prefix, return_stderr=True, |
+ expected_status=1) |
+ self.assertIn('command requires at least', stderr) |
+ |
+ # No arguments for ch, but valid subcommand. |
+ stderr = self.RunGsUtil(self._defacl_ch_prefix, return_stderr=True, |
+ expected_status=1) |
+ self.assertIn('command requires at least', stderr) |
+ |
+ # Neither arguments nor subcommand. |
+ stderr = self.RunGsUtil(['defacl'], return_stderr=True, expected_status=1) |
+ self.assertIn('command requires at least', stderr) |
+ |
+ |
+class TestDefaclOldAlias(TestDefacl): |
+ _defacl_ch_prefix = ['chdefacl'] |
+ _defacl_get_prefix = ['getdefacl'] |
+ _defacl_set_prefix = ['setdefacl'] |