OLD | NEW |
| (Empty) |
1 # -*- coding: utf-8 -*- | |
2 # Copyright 2014 Google Inc. All Rights Reserved. | |
3 # | |
4 # Licensed under the Apache License, Version 2.0 (the "License"); | |
5 # you may not use this file except in compliance with the License. | |
6 # You may obtain a copy of the License at | |
7 # | |
8 # http://www.apache.org/licenses/LICENSE-2.0 | |
9 # | |
10 # Unless required by applicable law or agreed to in writing, software | |
11 # distributed under the License is distributed on an "AS IS" BASIS, | |
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 # See the License for the specific language governing permissions and | |
14 # limitations under the License. | |
15 """Tests for signurl command.""" | |
16 from datetime import timedelta | |
17 import pkgutil | |
18 | |
19 import gslib.commands.signurl | |
20 from gslib.commands.signurl import HAVE_OPENSSL | |
21 from gslib.exception import CommandException | |
22 import gslib.tests.testcase as testcase | |
23 from gslib.tests.testcase.integration_testcase import SkipForS3 | |
24 from gslib.tests.util import ObjectToURI as suri | |
25 from gslib.tests.util import unittest | |
26 | |
27 | |
28 # pylint: disable=protected-access | |
29 @unittest.skipUnless(HAVE_OPENSSL, 'signurl requires pyopenssl.') | |
30 @SkipForS3('Signed URLs are only supported for gs:// URLs.') | |
31 class TestSignUrl(testcase.GsUtilIntegrationTestCase): | |
32 """Integration tests for signurl command.""" | |
33 | |
34 def _GetKsFile(self): | |
35 if not hasattr(self, 'ks_file'): | |
36 # Dummy pkcs12 keystore generated with the command | |
37 | |
38 # openssl req -new -passout pass:notasecret -batch \ | |
39 # -x509 -keyout signed_url_test.key -out signed_url_test.pem \ | |
40 # -subj '/CN=test.apps.googleusercontent.com' | |
41 | |
42 # && | |
43 | |
44 # openssl pkcs12 -export -passin pass:notasecret \ | |
45 # -passout pass:notasecret -inkey signed_url_test.key \ | |
46 # -in signed_url_test.pem -out test.p12 | |
47 | |
48 # && | |
49 | |
50 # rm signed_url_test.key signed_url_test.pem | |
51 contents = pkgutil.get_data('gslib', 'tests/test_data/test.p12') | |
52 self.ks_file = self.CreateTempFile(contents=contents) | |
53 return self.ks_file | |
54 | |
55 def testSignUrlOutput(self): | |
56 """Tests signurl output of a sample object.""" | |
57 | |
58 object_url = self.CreateObject(contents='z') | |
59 stdout = self.RunGsUtil(['signurl', '-p', 'notasecret', | |
60 self._GetKsFile(), suri(object_url)], | |
61 return_stdout=True) | |
62 | |
63 self.assertIn(object_url.uri, stdout) | |
64 self.assertIn('test@developer.gserviceaccount.com', stdout) | |
65 self.assertIn('Expires=', stdout) | |
66 self.assertIn('\tGET\t', stdout) | |
67 | |
68 stdout = self.RunGsUtil(['signurl', '-m', 'PUT', '-p', | |
69 'notasecret', self._GetKsFile(), | |
70 'gs://test/test.txt'], return_stdout=True) | |
71 | |
72 self.assertIn('test@developer.gserviceaccount.com', stdout) | |
73 self.assertIn('Expires=', stdout) | |
74 self.assertIn('\tPUT\t', stdout) | |
75 | |
76 def testSignUrlWithURLEncodeRequiredChars(self): | |
77 objs = ['gs://example.org/test 1', 'gs://example.org/test/test 2', | |
78 'gs://example.org/Аудиоарi хив'] | |
79 expected_partial_urls = [ | |
80 ('https://storage.googleapis.com/example.org/test%201?GoogleAccessId=te' | |
81 'st@developer.gserviceaccount.com'), | |
82 ('https://storage.googleapis.com/example.org/test/test%202?GoogleAccess' | |
83 'Id=test@developer.gserviceaccount.com'), | |
84 ('https://storage.googleapis.com/example.org/%D0%90%D1%83%D0%B4%D0%B8%D' | |
85 '0%BE%D0%B0%D1%80i%20%D1%85%D0%B8%D0%B2?GoogleAccessId=test@developer.' | |
86 'gserviceaccount.com') | |
87 ] | |
88 | |
89 self.assertEquals(len(objs), len(expected_partial_urls)) | |
90 | |
91 cmd_args = ['signurl', '-p', 'notasecret', self._GetKsFile()] | |
92 cmd_args.extend(objs) | |
93 | |
94 stdout = self.RunGsUtil(cmd_args, return_stdout=True) | |
95 | |
96 lines = stdout.split('\n') | |
97 # Header, signed urls, trailing newline. | |
98 self.assertEquals(len(lines), len(objs) + 2) | |
99 | |
100 # Strip the header line to make the indices line up. | |
101 lines = lines[1:] | |
102 | |
103 for obj, line, partial_url in zip(objs, lines, expected_partial_urls): | |
104 self.assertIn(obj, line) | |
105 self.assertIn(partial_url, line) | |
106 | |
107 def testSignUrlWithWildcard(self): | |
108 objs = ['test1', 'test2', 'test3'] | |
109 bucket = self.CreateBucket() | |
110 obj_urls = [] | |
111 | |
112 for obj_name in objs: | |
113 obj_urls.append(self.CreateObject(bucket_uri=bucket, | |
114 object_name=obj_name, contents='')) | |
115 | |
116 stdout = self.RunGsUtil(['signurl', '-p', | |
117 'notasecret', self._GetKsFile(), | |
118 suri(bucket) + '/*'], return_stdout=True) | |
119 | |
120 # Header, 3 signed urls, trailing newline | |
121 self.assertEquals(len(stdout.split('\n')), 5) | |
122 | |
123 for obj_url in obj_urls: | |
124 self.assertIn(suri(obj_url), stdout) | |
125 | |
126 def testSignUrlOfNonObjectUrl(self): | |
127 """Tests the signurl output of a non-existent file.""" | |
128 self.RunGsUtil(['signurl', self._GetKsFile(), 'gs://'], | |
129 expected_status=1, stdin='notasecret') | |
130 self.RunGsUtil(['signurl', 'file://tmp/abc'], expected_status=1) | |
131 | |
132 | |
133 @unittest.skipUnless(HAVE_OPENSSL, 'signurl requires pyopenssl.') | |
134 class UnitTestSignUrl(testcase.GsUtilUnitTestCase): | |
135 """Unit tests for the signurl command.""" | |
136 | |
137 def setUp(self): | |
138 super(UnitTestSignUrl, self).setUp() | |
139 self.ks_contents = pkgutil.get_data('gslib', 'tests/test_data/test.p12') | |
140 | |
141 def testDurationSpec(self): | |
142 tests = [('1h', timedelta(hours=1)), | |
143 ('2d', timedelta(days=2)), | |
144 ('5D', timedelta(days=5)), | |
145 ('35s', timedelta(seconds=35)), | |
146 ('1h', timedelta(hours=1)), | |
147 ('33', timedelta(hours=33)), | |
148 ('22m', timedelta(minutes=22)), | |
149 ('3.7', None), | |
150 ('27Z', None), | |
151 ] | |
152 | |
153 for inp, expected in tests: | |
154 try: | |
155 td = gslib.commands.signurl._DurationToTimeDelta(inp) | |
156 self.assertEquals(td, expected) | |
157 except CommandException: | |
158 if expected is not None: | |
159 self.fail('{0} failed to parse') | |
160 | |
161 def testSignPut(self): | |
162 """Tests the return value of the _GenSignedUrl function with \ | |
163 a PUT method.""" | |
164 | |
165 expected = ('https://storage.googleapis.com/test/test.txt?' | |
166 'GoogleAccessId=test@developer.gserviceaccount.com' | |
167 '&Expires=1391816302&Signature=A6QbgTA8cXZCtjy2xCr401bdi0e' | |
168 '7zChTBQ6BX61L7AfytTGEQDMD%2BbvOQKjX7%2FsEh77cmzcSxOEKqTLUD' | |
169 'bbkPgPqW3j8sGPSRX9VM58bgj1vt9yU8cRKoegFHXAqsATx2G5rc%2FvEl' | |
170 'iFp9UWMfVj5TaukqlBAVuzZWlyx0aQa9tCKXRtC9YcxORxG41RfiowA2kd8' | |
171 'XBTQt4M9XTzpVyr5rVMzfr2LvtGf9UAJvlt8p6T6nThl2vy9%2FwBoPcMFa' | |
172 'OWQcGTagwjyKWDcI1vQPIFQLGftAcv3QnGZxZTtg8pZW%2FIxRJrBhfFfcA' | |
173 'c62hDKyaU2YssSMy%2FjUJynWx3TIiJjhg%3D%3D') | |
174 | |
175 expiration = 1391816302 | |
176 ks, client_id = (gslib.commands.signurl | |
177 ._ReadKeystore(self.ks_contents, 'notasecret')) | |
178 signed_url = (gslib.commands.signurl | |
179 ._GenSignedUrl(ks.get_privatekey(), | |
180 client_id, 'PUT', '', | |
181 '', expiration, 'test/test.txt')) | |
182 self.assertEquals(expected, signed_url) | |
183 | |
184 def testSignurlPutContentype(self): | |
185 """Tests the return value of the _GenSignedUrl function with \ | |
186 a PUT method and specified content type.""" | |
187 | |
188 expected = ('https://storage.googleapis.com/test/test.txt?' | |
189 'GoogleAccessId=test@developer.gserviceaccount.com&' | |
190 'Expires=1391816302&Signature=APn%2BCCVcQrfc1fKQXrs' | |
191 'PEZFj9%2FmASO%2BolR8xwgBY6PbWMkcCtrUVFBauP6t4NxqZO' | |
192 'UnbOFYTZYzul0RC57ZkEWJp3VcyDIHcn6usEE%2FTzUHhbDCDW' | |
193 'awAkZS7p8kO8IIACuJlF5s9xZmZzaEBtzF0%2BBOsGgBPBlg2y' | |
194 'zrhFB6cyyAwNiUgmhLQaVkdobnSwtI5QJkvXoIjJb6hhLiVbLC' | |
195 'rWdgSZVusjAKGlWCJsM%2B4TkCR%2Bi8AnrkECngcMHuJ9mYbS' | |
196 'XI1VfEmcnRVcfkKkJGZGctaDIWK%2FMTEmfYCW6USt3Zk2WowJ' | |
197 'SGuJHqEcFz0kyfAlkpmG%2Fl5E1FQROYqLN2kZQ%3D%3D') | |
198 | |
199 expiration = 1391816302 | |
200 ks, client_id = (gslib.commands.signurl | |
201 ._ReadKeystore(self.ks_contents, | |
202 'notasecret')) | |
203 signed_url = (gslib.commands.signurl | |
204 ._GenSignedUrl(ks.get_privatekey(), | |
205 client_id, 'PUT', '', | |
206 'text/plain', expiration, | |
207 'test/test.txt')) | |
208 self.assertEquals(expected, signed_url) | |
209 | |
210 def testSignurlGet(self): | |
211 """Tests the return value of the _GenSignedUrl function with \ | |
212 a GET method.""" | |
213 | |
214 expected = ('https://storage.googleapis.com/test/test.txt?' | |
215 'GoogleAccessId=test@developer.gserviceaccount.com&' | |
216 'Expires=0&Signature=TCZwe32cU%2BMksmLiSY9shHXQjLs1' | |
217 'F3y%2F%2F1M0UhiK4qsPRVNZVwI7YWvv2qa2Xa%2BVBBafboF0' | |
218 '1%2BWvx3ZG316pwpNIRR6y7jNnE0LvQmHE8afbm2VYCi%2B2JS' | |
219 'ZK2YZFJAyEek8si53jhYQEmaRq1zPfGbX84B2FJ8v4iI%2FTC1' | |
220 'I9OE5vHF0sWwIR9d73JDrFLjaync7QYFWRExdwvqlQX%2BPO3r' | |
221 'OG9Ns%2BcQFIN7npnsVjH28yNY9gBzXya8LYmNvUx6bWHWZMiu' | |
222 'fLwDZ0jejNeDZTOfQGRM%2B0vY7NslzaT06W1wo8P7McSkAZEl' | |
223 'DCbhR0Vo1fturPMwmAhi88f0qzRzywbg%3D%3D') | |
224 | |
225 expiration = 0 | |
226 ks, client_id = (gslib.commands.signurl | |
227 ._ReadKeystore(self.ks_contents, | |
228 'notasecret')) | |
229 signed_url = (gslib.commands.signurl | |
230 ._GenSignedUrl(ks.get_privatekey(), | |
231 client_id, 'GET', '', | |
232 '', expiration, 'test/test.txt')) | |
233 self.assertEquals(expected, signed_url) | |
OLD | NEW |