| OLD | NEW |
| (Empty) |
| 1 # -*- coding: utf-8 -*- | |
| 2 # Copyright 2014 Google Inc. All Rights Reserved. | |
| 3 # | |
| 4 # Licensed under the Apache License, Version 2.0 (the "License"); | |
| 5 # you may not use this file except in compliance with the License. | |
| 6 # You may obtain a copy of the License at | |
| 7 # | |
| 8 # http://www.apache.org/licenses/LICENSE-2.0 | |
| 9 # | |
| 10 # Unless required by applicable law or agreed to in writing, software | |
| 11 # distributed under the License is distributed on an "AS IS" BASIS, | |
| 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 13 # See the License for the specific language governing permissions and | |
| 14 # limitations under the License. | |
| 15 """Tests for signurl command.""" | |
| 16 from datetime import timedelta | |
| 17 import pkgutil | |
| 18 | |
| 19 import gslib.commands.signurl | |
| 20 from gslib.commands.signurl import HAVE_OPENSSL | |
| 21 from gslib.exception import CommandException | |
| 22 import gslib.tests.testcase as testcase | |
| 23 from gslib.tests.testcase.integration_testcase import SkipForS3 | |
| 24 from gslib.tests.util import ObjectToURI as suri | |
| 25 from gslib.tests.util import unittest | |
| 26 | |
| 27 | |
| 28 # pylint: disable=protected-access | |
| 29 @unittest.skipUnless(HAVE_OPENSSL, 'signurl requires pyopenssl.') | |
| 30 @SkipForS3('Signed URLs are only supported for gs:// URLs.') | |
| 31 class TestSignUrl(testcase.GsUtilIntegrationTestCase): | |
| 32 """Integration tests for signurl command.""" | |
| 33 | |
| 34 def _GetKsFile(self): | |
| 35 if not hasattr(self, 'ks_file'): | |
| 36 # Dummy pkcs12 keystore generated with the command | |
| 37 | |
| 38 # openssl req -new -passout pass:notasecret -batch \ | |
| 39 # -x509 -keyout signed_url_test.key -out signed_url_test.pem \ | |
| 40 # -subj '/CN=test.apps.googleusercontent.com' | |
| 41 | |
| 42 # && | |
| 43 | |
| 44 # openssl pkcs12 -export -passin pass:notasecret \ | |
| 45 # -passout pass:notasecret -inkey signed_url_test.key \ | |
| 46 # -in signed_url_test.pem -out test.p12 | |
| 47 | |
| 48 # && | |
| 49 | |
| 50 # rm signed_url_test.key signed_url_test.pem | |
| 51 contents = pkgutil.get_data('gslib', 'tests/test_data/test.p12') | |
| 52 self.ks_file = self.CreateTempFile(contents=contents) | |
| 53 return self.ks_file | |
| 54 | |
| 55 def testSignUrlOutput(self): | |
| 56 """Tests signurl output of a sample object.""" | |
| 57 | |
| 58 object_url = self.CreateObject(contents='z') | |
| 59 stdout = self.RunGsUtil(['signurl', '-p', 'notasecret', | |
| 60 self._GetKsFile(), suri(object_url)], | |
| 61 return_stdout=True) | |
| 62 | |
| 63 self.assertIn(object_url.uri, stdout) | |
| 64 self.assertIn('test@developer.gserviceaccount.com', stdout) | |
| 65 self.assertIn('Expires=', stdout) | |
| 66 self.assertIn('\tGET\t', stdout) | |
| 67 | |
| 68 stdout = self.RunGsUtil(['signurl', '-m', 'PUT', '-p', | |
| 69 'notasecret', self._GetKsFile(), | |
| 70 'gs://test/test.txt'], return_stdout=True) | |
| 71 | |
| 72 self.assertIn('test@developer.gserviceaccount.com', stdout) | |
| 73 self.assertIn('Expires=', stdout) | |
| 74 self.assertIn('\tPUT\t', stdout) | |
| 75 | |
| 76 def testSignUrlWithURLEncodeRequiredChars(self): | |
| 77 objs = ['gs://example.org/test 1', 'gs://example.org/test/test 2', | |
| 78 'gs://example.org/Аудиоарi хив'] | |
| 79 expected_partial_urls = [ | |
| 80 ('https://storage.googleapis.com/example.org/test%201?GoogleAccessId=te' | |
| 81 'st@developer.gserviceaccount.com'), | |
| 82 ('https://storage.googleapis.com/example.org/test/test%202?GoogleAccess' | |
| 83 'Id=test@developer.gserviceaccount.com'), | |
| 84 ('https://storage.googleapis.com/example.org/%D0%90%D1%83%D0%B4%D0%B8%D' | |
| 85 '0%BE%D0%B0%D1%80i%20%D1%85%D0%B8%D0%B2?GoogleAccessId=test@developer.' | |
| 86 'gserviceaccount.com') | |
| 87 ] | |
| 88 | |
| 89 self.assertEquals(len(objs), len(expected_partial_urls)) | |
| 90 | |
| 91 cmd_args = ['signurl', '-p', 'notasecret', self._GetKsFile()] | |
| 92 cmd_args.extend(objs) | |
| 93 | |
| 94 stdout = self.RunGsUtil(cmd_args, return_stdout=True) | |
| 95 | |
| 96 lines = stdout.split('\n') | |
| 97 # Header, signed urls, trailing newline. | |
| 98 self.assertEquals(len(lines), len(objs) + 2) | |
| 99 | |
| 100 # Strip the header line to make the indices line up. | |
| 101 lines = lines[1:] | |
| 102 | |
| 103 for obj, line, partial_url in zip(objs, lines, expected_partial_urls): | |
| 104 self.assertIn(obj, line) | |
| 105 self.assertIn(partial_url, line) | |
| 106 | |
| 107 def testSignUrlWithWildcard(self): | |
| 108 objs = ['test1', 'test2', 'test3'] | |
| 109 bucket = self.CreateBucket() | |
| 110 obj_urls = [] | |
| 111 | |
| 112 for obj_name in objs: | |
| 113 obj_urls.append(self.CreateObject(bucket_uri=bucket, | |
| 114 object_name=obj_name, contents='')) | |
| 115 | |
| 116 stdout = self.RunGsUtil(['signurl', '-p', | |
| 117 'notasecret', self._GetKsFile(), | |
| 118 suri(bucket) + '/*'], return_stdout=True) | |
| 119 | |
| 120 # Header, 3 signed urls, trailing newline | |
| 121 self.assertEquals(len(stdout.split('\n')), 5) | |
| 122 | |
| 123 for obj_url in obj_urls: | |
| 124 self.assertIn(suri(obj_url), stdout) | |
| 125 | |
| 126 def testSignUrlOfNonObjectUrl(self): | |
| 127 """Tests the signurl output of a non-existent file.""" | |
| 128 self.RunGsUtil(['signurl', self._GetKsFile(), 'gs://'], | |
| 129 expected_status=1, stdin='notasecret') | |
| 130 self.RunGsUtil(['signurl', 'file://tmp/abc'], expected_status=1) | |
| 131 | |
| 132 | |
| 133 @unittest.skipUnless(HAVE_OPENSSL, 'signurl requires pyopenssl.') | |
| 134 class UnitTestSignUrl(testcase.GsUtilUnitTestCase): | |
| 135 """Unit tests for the signurl command.""" | |
| 136 | |
| 137 def setUp(self): | |
| 138 super(UnitTestSignUrl, self).setUp() | |
| 139 self.ks_contents = pkgutil.get_data('gslib', 'tests/test_data/test.p12') | |
| 140 | |
| 141 def testDurationSpec(self): | |
| 142 tests = [('1h', timedelta(hours=1)), | |
| 143 ('2d', timedelta(days=2)), | |
| 144 ('5D', timedelta(days=5)), | |
| 145 ('35s', timedelta(seconds=35)), | |
| 146 ('1h', timedelta(hours=1)), | |
| 147 ('33', timedelta(hours=33)), | |
| 148 ('22m', timedelta(minutes=22)), | |
| 149 ('3.7', None), | |
| 150 ('27Z', None), | |
| 151 ] | |
| 152 | |
| 153 for inp, expected in tests: | |
| 154 try: | |
| 155 td = gslib.commands.signurl._DurationToTimeDelta(inp) | |
| 156 self.assertEquals(td, expected) | |
| 157 except CommandException: | |
| 158 if expected is not None: | |
| 159 self.fail('{0} failed to parse') | |
| 160 | |
| 161 def testSignPut(self): | |
| 162 """Tests the return value of the _GenSignedUrl function with \ | |
| 163 a PUT method.""" | |
| 164 | |
| 165 expected = ('https://storage.googleapis.com/test/test.txt?' | |
| 166 'GoogleAccessId=test@developer.gserviceaccount.com' | |
| 167 '&Expires=1391816302&Signature=A6QbgTA8cXZCtjy2xCr401bdi0e' | |
| 168 '7zChTBQ6BX61L7AfytTGEQDMD%2BbvOQKjX7%2FsEh77cmzcSxOEKqTLUD' | |
| 169 'bbkPgPqW3j8sGPSRX9VM58bgj1vt9yU8cRKoegFHXAqsATx2G5rc%2FvEl' | |
| 170 'iFp9UWMfVj5TaukqlBAVuzZWlyx0aQa9tCKXRtC9YcxORxG41RfiowA2kd8' | |
| 171 'XBTQt4M9XTzpVyr5rVMzfr2LvtGf9UAJvlt8p6T6nThl2vy9%2FwBoPcMFa' | |
| 172 'OWQcGTagwjyKWDcI1vQPIFQLGftAcv3QnGZxZTtg8pZW%2FIxRJrBhfFfcA' | |
| 173 'c62hDKyaU2YssSMy%2FjUJynWx3TIiJjhg%3D%3D') | |
| 174 | |
| 175 expiration = 1391816302 | |
| 176 ks, client_id = (gslib.commands.signurl | |
| 177 ._ReadKeystore(self.ks_contents, 'notasecret')) | |
| 178 signed_url = (gslib.commands.signurl | |
| 179 ._GenSignedUrl(ks.get_privatekey(), | |
| 180 client_id, 'PUT', '', | |
| 181 '', expiration, 'test/test.txt')) | |
| 182 self.assertEquals(expected, signed_url) | |
| 183 | |
| 184 def testSignurlPutContentype(self): | |
| 185 """Tests the return value of the _GenSignedUrl function with \ | |
| 186 a PUT method and specified content type.""" | |
| 187 | |
| 188 expected = ('https://storage.googleapis.com/test/test.txt?' | |
| 189 'GoogleAccessId=test@developer.gserviceaccount.com&' | |
| 190 'Expires=1391816302&Signature=APn%2BCCVcQrfc1fKQXrs' | |
| 191 'PEZFj9%2FmASO%2BolR8xwgBY6PbWMkcCtrUVFBauP6t4NxqZO' | |
| 192 'UnbOFYTZYzul0RC57ZkEWJp3VcyDIHcn6usEE%2FTzUHhbDCDW' | |
| 193 'awAkZS7p8kO8IIACuJlF5s9xZmZzaEBtzF0%2BBOsGgBPBlg2y' | |
| 194 'zrhFB6cyyAwNiUgmhLQaVkdobnSwtI5QJkvXoIjJb6hhLiVbLC' | |
| 195 'rWdgSZVusjAKGlWCJsM%2B4TkCR%2Bi8AnrkECngcMHuJ9mYbS' | |
| 196 'XI1VfEmcnRVcfkKkJGZGctaDIWK%2FMTEmfYCW6USt3Zk2WowJ' | |
| 197 'SGuJHqEcFz0kyfAlkpmG%2Fl5E1FQROYqLN2kZQ%3D%3D') | |
| 198 | |
| 199 expiration = 1391816302 | |
| 200 ks, client_id = (gslib.commands.signurl | |
| 201 ._ReadKeystore(self.ks_contents, | |
| 202 'notasecret')) | |
| 203 signed_url = (gslib.commands.signurl | |
| 204 ._GenSignedUrl(ks.get_privatekey(), | |
| 205 client_id, 'PUT', '', | |
| 206 'text/plain', expiration, | |
| 207 'test/test.txt')) | |
| 208 self.assertEquals(expected, signed_url) | |
| 209 | |
| 210 def testSignurlGet(self): | |
| 211 """Tests the return value of the _GenSignedUrl function with \ | |
| 212 a GET method.""" | |
| 213 | |
| 214 expected = ('https://storage.googleapis.com/test/test.txt?' | |
| 215 'GoogleAccessId=test@developer.gserviceaccount.com&' | |
| 216 'Expires=0&Signature=TCZwe32cU%2BMksmLiSY9shHXQjLs1' | |
| 217 'F3y%2F%2F1M0UhiK4qsPRVNZVwI7YWvv2qa2Xa%2BVBBafboF0' | |
| 218 '1%2BWvx3ZG316pwpNIRR6y7jNnE0LvQmHE8afbm2VYCi%2B2JS' | |
| 219 'ZK2YZFJAyEek8si53jhYQEmaRq1zPfGbX84B2FJ8v4iI%2FTC1' | |
| 220 'I9OE5vHF0sWwIR9d73JDrFLjaync7QYFWRExdwvqlQX%2BPO3r' | |
| 221 'OG9Ns%2BcQFIN7npnsVjH28yNY9gBzXya8LYmNvUx6bWHWZMiu' | |
| 222 'fLwDZ0jejNeDZTOfQGRM%2B0vY7NslzaT06W1wo8P7McSkAZEl' | |
| 223 'DCbhR0Vo1fturPMwmAhi88f0qzRzywbg%3D%3D') | |
| 224 | |
| 225 expiration = 0 | |
| 226 ks, client_id = (gslib.commands.signurl | |
| 227 ._ReadKeystore(self.ks_contents, | |
| 228 'notasecret')) | |
| 229 signed_url = (gslib.commands.signurl | |
| 230 ._GenSignedUrl(ks.get_privatekey(), | |
| 231 client_id, 'GET', '', | |
| 232 '', expiration, 'test/test.txt')) | |
| 233 self.assertEquals(expected, signed_url) | |
| OLD | NEW |