Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(389)

Side by Side Diff: third_party/gsutil/third_party/boto/tests/integration/s3/test_key.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # -*- coding: utf-8 -*-
2 # Copyright (c) 2012 Mitch Garnaat http://garnaat.org/
3 # All rights reserved.
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining a
6 # copy of this software and associated documentation files (the
7 # "Software"), to deal in the Software without restriction, including
8 # without limitation the rights to use, copy, modify, merge, publish, dis-
9 # tribute, sublicense, and/or sell copies of the Software, and to permit
10 # persons to whom the Software is furnished to do so, subject to the fol-
11 # lowing conditions:
12 #
13 # The above copyright notice and this permission notice shall be included
14 # in all copies or substantial portions of the Software.
15 #
16 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
17 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
18 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
19 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
20 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 # IN THE SOFTWARE.
23
24 """
25 Some unit tests for S3 Key
26 """
27
28 from tests.unit import unittest
29 import time
30
31 import boto.s3
32 from boto.compat import six, StringIO, urllib
33 from boto.s3.connection import S3Connection
34 from boto.s3.key import Key
35 from boto.exception import S3ResponseError
36
37
38 class S3KeyTest(unittest.TestCase):
39 s3 = True
40
41 def setUp(self):
42 self.conn = S3Connection()
43 self.bucket_name = 'keytest-%d' % int(time.time())
44 self.bucket = self.conn.create_bucket(self.bucket_name)
45
46 def tearDown(self):
47 for key in self.bucket:
48 key.delete()
49 self.bucket.delete()
50
51 def test_set_contents_from_file_dataloss(self):
52 # Create an empty stringio and write to it.
53 content = "abcde"
54 sfp = StringIO()
55 sfp.write(content)
56 # Try set_contents_from_file() without rewinding sfp
57 k = self.bucket.new_key("k")
58 try:
59 k.set_contents_from_file(sfp)
60 self.fail("forgot to rewind so should fail.")
61 except AttributeError:
62 pass
63 # call with rewind and check if we wrote 5 bytes
64 k.set_contents_from_file(sfp, rewind=True)
65 self.assertEqual(k.size, 5)
66 # check actual contents by getting it.
67 kn = self.bucket.new_key("k")
68 ks = kn.get_contents_as_string().decode('utf-8')
69 self.assertEqual(ks, content)
70
71 # finally, try with a 0 length string
72 sfp = StringIO()
73 k = self.bucket.new_key("k")
74 k.set_contents_from_file(sfp)
75 self.assertEqual(k.size, 0)
76 # check actual contents by getting it.
77 kn = self.bucket.new_key("k")
78 ks = kn.get_contents_as_string().decode('utf-8')
79 self.assertEqual(ks, "")
80
81 def test_set_contents_as_file(self):
82 content="01234567890123456789"
83 sfp = StringIO(content)
84
85 # fp is set at 0 for just opened (for read) files.
86 # set_contents should write full content to key.
87 k = self.bucket.new_key("k")
88 k.set_contents_from_file(sfp)
89 self.assertEqual(k.size, 20)
90 kn = self.bucket.new_key("k")
91 ks = kn.get_contents_as_string().decode('utf-8')
92 self.assertEqual(ks, content)
93
94 # set fp to 5 and set contents. this should
95 # set "567890123456789" to the key
96 sfp.seek(5)
97 k = self.bucket.new_key("k")
98 k.set_contents_from_file(sfp)
99 self.assertEqual(k.size, 15)
100 kn = self.bucket.new_key("k")
101 ks = kn.get_contents_as_string().decode('utf-8')
102 self.assertEqual(ks, content[5:])
103
104 # set fp to 5 and only set 5 bytes. this should
105 # write the value "56789" to the key.
106 sfp.seek(5)
107 k = self.bucket.new_key("k")
108 k.set_contents_from_file(sfp, size=5)
109 self.assertEqual(k.size, 5)
110 self.assertEqual(sfp.tell(), 10)
111 kn = self.bucket.new_key("k")
112 ks = kn.get_contents_as_string().decode('utf-8')
113 self.assertEqual(ks, content[5:10])
114
115 def test_set_contents_with_md5(self):
116 content="01234567890123456789"
117 sfp = StringIO(content)
118
119 # fp is set at 0 for just opened (for read) files.
120 # set_contents should write full content to key.
121 k = self.bucket.new_key("k")
122 good_md5 = k.compute_md5(sfp)
123 k.set_contents_from_file(sfp, md5=good_md5)
124 kn = self.bucket.new_key("k")
125 ks = kn.get_contents_as_string().decode('utf-8')
126 self.assertEqual(ks, content)
127
128 # set fp to 5 and only set 5 bytes. this should
129 # write the value "56789" to the key.
130 sfp.seek(5)
131 k = self.bucket.new_key("k")
132 good_md5 = k.compute_md5(sfp, size=5)
133 k.set_contents_from_file(sfp, size=5, md5=good_md5)
134 self.assertEqual(sfp.tell(), 10)
135 kn = self.bucket.new_key("k")
136 ks = kn.get_contents_as_string().decode('utf-8')
137 self.assertEqual(ks, content[5:10])
138
139 # let's try a wrong md5 by just altering it.
140 k = self.bucket.new_key("k")
141 sfp.seek(0)
142 hexdig, base64 = k.compute_md5(sfp)
143 bad_md5 = (hexdig, base64[3:])
144 try:
145 k.set_contents_from_file(sfp, md5=bad_md5)
146 self.fail("should fail with bad md5")
147 except S3ResponseError:
148 pass
149
150 def test_get_contents_with_md5(self):
151 content="01234567890123456789"
152 sfp = StringIO(content)
153
154 k = self.bucket.new_key("k")
155 k.set_contents_from_file(sfp)
156 kn = self.bucket.new_key("k")
157 s = kn.get_contents_as_string().decode('utf-8')
158 self.assertEqual(kn.md5, k.md5)
159 self.assertEqual(s, content)
160
161 def test_file_callback(self):
162 def callback(wrote, total):
163 self.my_cb_cnt += 1
164 self.assertNotEqual(wrote, self.my_cb_last, "called twice with same value")
165 self.my_cb_last = wrote
166
167 # Zero bytes written => 1 call
168 self.my_cb_cnt = 0
169 self.my_cb_last = None
170 k = self.bucket.new_key("k")
171 k.BufferSize = 2
172 sfp = StringIO("")
173 k.set_contents_from_file(sfp, cb=callback, num_cb=10)
174 self.assertEqual(self.my_cb_cnt, 1)
175 self.assertEqual(self.my_cb_last, 0)
176 sfp.close()
177
178 # Read back zero bytes => 1 call
179 self.my_cb_cnt = 0
180 self.my_cb_last = None
181 s = k.get_contents_as_string(cb=callback)
182 self.assertEqual(self.my_cb_cnt, 1)
183 self.assertEqual(self.my_cb_last, 0)
184
185 content="01234567890123456789"
186 sfp = StringIO(content)
187
188 # expect 2 calls due start/finish
189 self.my_cb_cnt = 0
190 self.my_cb_last = None
191 k = self.bucket.new_key("k")
192 k.set_contents_from_file(sfp, cb=callback, num_cb=10)
193 self.assertEqual(self.my_cb_cnt, 2)
194 self.assertEqual(self.my_cb_last, 20)
195
196 # Read back all bytes => 2 calls
197 self.my_cb_cnt = 0
198 self.my_cb_last = None
199 s = k.get_contents_as_string(cb=callback).decode('utf-8')
200 self.assertEqual(self.my_cb_cnt, 2)
201 self.assertEqual(self.my_cb_last, 20)
202 self.assertEqual(s, content)
203
204 # rewind sfp and try upload again. -1 should call
205 # for every read/write so that should make 11 when bs=2
206 sfp.seek(0)
207 self.my_cb_cnt = 0
208 self.my_cb_last = None
209 k = self.bucket.new_key("k")
210 k.BufferSize = 2
211 k.set_contents_from_file(sfp, cb=callback, num_cb=-1)
212 self.assertEqual(self.my_cb_cnt, 11)
213 self.assertEqual(self.my_cb_last, 20)
214
215 # Read back all bytes => 11 calls
216 self.my_cb_cnt = 0
217 self.my_cb_last = None
218 s = k.get_contents_as_string(cb=callback, num_cb=-1).decode('utf-8')
219 self.assertEqual(self.my_cb_cnt, 11)
220 self.assertEqual(self.my_cb_last, 20)
221 self.assertEqual(s, content)
222
223 # no more than 1 times => 2 times
224 # last time always 20 bytes
225 sfp.seek(0)
226 self.my_cb_cnt = 0
227 self.my_cb_last = None
228 k = self.bucket.new_key("k")
229 k.BufferSize = 2
230 k.set_contents_from_file(sfp, cb=callback, num_cb=1)
231 self.assertTrue(self.my_cb_cnt <= 2)
232 self.assertEqual(self.my_cb_last, 20)
233
234 # no more than 1 times => 2 times
235 self.my_cb_cnt = 0
236 self.my_cb_last = None
237 s = k.get_contents_as_string(cb=callback, num_cb=1).decode('utf-8')
238 self.assertTrue(self.my_cb_cnt <= 2)
239 self.assertEqual(self.my_cb_last, 20)
240 self.assertEqual(s, content)
241
242 # no more than 2 times
243 # last time always 20 bytes
244 sfp.seek(0)
245 self.my_cb_cnt = 0
246 self.my_cb_last = None
247 k = self.bucket.new_key("k")
248 k.BufferSize = 2
249 k.set_contents_from_file(sfp, cb=callback, num_cb=2)
250 self.assertTrue(self.my_cb_cnt <= 2)
251 self.assertEqual(self.my_cb_last, 20)
252
253 # no more than 2 times
254 self.my_cb_cnt = 0
255 self.my_cb_last = None
256 s = k.get_contents_as_string(cb=callback, num_cb=2).decode('utf-8')
257 self.assertTrue(self.my_cb_cnt <= 2)
258 self.assertEqual(self.my_cb_last, 20)
259 self.assertEqual(s, content)
260
261 # no more than 3 times
262 # last time always 20 bytes
263 sfp.seek(0)
264 self.my_cb_cnt = 0
265 self.my_cb_last = None
266 k = self.bucket.new_key("k")
267 k.BufferSize = 2
268 k.set_contents_from_file(sfp, cb=callback, num_cb=3)
269 self.assertTrue(self.my_cb_cnt <= 3)
270 self.assertEqual(self.my_cb_last, 20)
271
272 # no more than 3 times
273 self.my_cb_cnt = 0
274 self.my_cb_last = None
275 s = k.get_contents_as_string(cb=callback, num_cb=3).decode('utf-8')
276 self.assertTrue(self.my_cb_cnt <= 3)
277 self.assertEqual(self.my_cb_last, 20)
278 self.assertEqual(s, content)
279
280 # no more than 4 times
281 # last time always 20 bytes
282 sfp.seek(0)
283 self.my_cb_cnt = 0
284 self.my_cb_last = None
285 k = self.bucket.new_key("k")
286 k.BufferSize = 2
287 k.set_contents_from_file(sfp, cb=callback, num_cb=4)
288 self.assertTrue(self.my_cb_cnt <= 4)
289 self.assertEqual(self.my_cb_last, 20)
290
291 # no more than 4 times
292 self.my_cb_cnt = 0
293 self.my_cb_last = None
294 s = k.get_contents_as_string(cb=callback, num_cb=4).decode('utf-8')
295 self.assertTrue(self.my_cb_cnt <= 4)
296 self.assertEqual(self.my_cb_last, 20)
297 self.assertEqual(s, content)
298
299 # no more than 6 times
300 # last time always 20 bytes
301 sfp.seek(0)
302 self.my_cb_cnt = 0
303 self.my_cb_last = None
304 k = self.bucket.new_key("k")
305 k.BufferSize = 2
306 k.set_contents_from_file(sfp, cb=callback, num_cb=6)
307 self.assertTrue(self.my_cb_cnt <= 6)
308 self.assertEqual(self.my_cb_last, 20)
309
310 # no more than 6 times
311 self.my_cb_cnt = 0
312 self.my_cb_last = None
313 s = k.get_contents_as_string(cb=callback, num_cb=6).decode('utf-8')
314 self.assertTrue(self.my_cb_cnt <= 6)
315 self.assertEqual(self.my_cb_last, 20)
316 self.assertEqual(s, content)
317
318 # no more than 10 times
319 # last time always 20 bytes
320 sfp.seek(0)
321 self.my_cb_cnt = 0
322 self.my_cb_last = None
323 k = self.bucket.new_key("k")
324 k.BufferSize = 2
325 k.set_contents_from_file(sfp, cb=callback, num_cb=10)
326 self.assertTrue(self.my_cb_cnt <= 10)
327 self.assertEqual(self.my_cb_last, 20)
328
329 # no more than 10 times
330 self.my_cb_cnt = 0
331 self.my_cb_last = None
332 s = k.get_contents_as_string(cb=callback, num_cb=10).decode('utf-8')
333 self.assertTrue(self.my_cb_cnt <= 10)
334 self.assertEqual(self.my_cb_last, 20)
335 self.assertEqual(s, content)
336
337 # no more than 1000 times
338 # last time always 20 bytes
339 sfp.seek(0)
340 self.my_cb_cnt = 0
341 self.my_cb_last = None
342 k = self.bucket.new_key("k")
343 k.BufferSize = 2
344 k.set_contents_from_file(sfp, cb=callback, num_cb=1000)
345 self.assertTrue(self.my_cb_cnt <= 1000)
346 self.assertEqual(self.my_cb_last, 20)
347
348 # no more than 1000 times
349 self.my_cb_cnt = 0
350 self.my_cb_last = None
351 s = k.get_contents_as_string(cb=callback, num_cb=1000).decode('utf-8')
352 self.assertTrue(self.my_cb_cnt <= 1000)
353 self.assertEqual(self.my_cb_last, 20)
354 self.assertEqual(s, content)
355
356 def test_website_redirects(self):
357 self.bucket.configure_website('index.html')
358 key = self.bucket.new_key('redirect-key')
359 self.assertTrue(key.set_redirect('http://www.amazon.com/'))
360 self.assertEqual(key.get_redirect(), 'http://www.amazon.com/')
361
362 self.assertTrue(key.set_redirect('http://aws.amazon.com/'))
363 self.assertEqual(key.get_redirect(), 'http://aws.amazon.com/')
364
365 def test_website_redirect_none_configured(self):
366 key = self.bucket.new_key('redirect-key')
367 key.set_contents_from_string('')
368 self.assertEqual(key.get_redirect(), None)
369
370 def test_website_redirect_with_bad_value(self):
371 self.bucket.configure_website('index.html')
372 key = self.bucket.new_key('redirect-key')
373 with self.assertRaises(key.provider.storage_response_error):
374 # Must start with a / or http
375 key.set_redirect('ftp://ftp.example.org')
376 with self.assertRaises(key.provider.storage_response_error):
377 # Must start with a / or http
378 key.set_redirect('')
379
380 def test_setting_date(self):
381 key = self.bucket.new_key('test_date')
382 # This should actually set x-amz-meta-date & not fail miserably.
383 key.set_metadata('date', '20130524T155935Z')
384 key.set_contents_from_string('Some text here.')
385
386 check = self.bucket.get_key('test_date')
387 self.assertEqual(check.get_metadata('date'), u'20130524T155935Z')
388 self.assertTrue('x-amz-meta-date' in check._get_remote_metadata())
389
390 def test_header_casing(self):
391 key = self.bucket.new_key('test_header_case')
392 # Using anything but CamelCase on ``Content-Type`` or ``Content-MD5``
393 # used to cause a signature error (when using ``s3`` for signing).
394 key.set_metadata('Content-type', 'application/json')
395 key.set_metadata('Content-md5', 'XmUKnus7svY1frWsVskxXg==')
396 key.set_contents_from_string('{"abc": 123}')
397
398 check = self.bucket.get_key('test_header_case')
399 self.assertEqual(check.content_type, 'application/json')
400
401 def test_header_encoding(self):
402 key = self.bucket.new_key('test_header_encoding')
403
404 key.set_metadata('Cache-control', u'public, max-age=500')
405 key.set_metadata('Test-Plus', u'A plus (+)')
406 key.set_metadata('Content-disposition', u'filename=Schöne Zeit.txt')
407 key.set_metadata('Content-Encoding', 'gzip')
408 key.set_metadata('Content-Language', 'de')
409 key.set_metadata('Content-Type', 'application/pdf')
410 self.assertEqual(key.content_type, 'application/pdf')
411 key.set_metadata('X-Robots-Tag', 'all')
412 key.set_metadata('Expires', u'Thu, 01 Dec 1994 16:00:00 GMT')
413 key.set_contents_from_string('foo')
414
415 check = self.bucket.get_key('test_header_encoding')
416 remote_metadata = check._get_remote_metadata()
417
418 # TODO: investigate whether encoding ' ' as '%20' makes sense
419 self.assertEqual(check.cache_control, 'public,%20max-age=500')
420 self.assertEqual(remote_metadata['cache-control'], 'public,%20max-age=50 0')
421 self.assertEqual(check.get_metadata('test-plus'), 'A plus (+)')
422 self.assertEqual(check.content_disposition, 'filename=Sch%C3%B6ne%20Zeit .txt')
423 self.assertEqual(remote_metadata['content-disposition'], 'filename=Sch%C 3%B6ne%20Zeit.txt')
424 self.assertEqual(check.content_encoding, 'gzip')
425 self.assertEqual(remote_metadata['content-encoding'], 'gzip')
426 self.assertEqual(check.content_language, 'de')
427 self.assertEqual(remote_metadata['content-language'], 'de')
428 self.assertEqual(check.content_type, 'application/pdf')
429 self.assertEqual(remote_metadata['content-type'], 'application/pdf')
430 self.assertEqual(check.x_robots_tag, 'all')
431 self.assertEqual(remote_metadata['x-robots-tag'], 'all')
432 self.assertEqual(check.expires, 'Thu,%2001%20Dec%201994%2016:00:00%20GMT ')
433 self.assertEqual(remote_metadata['expires'], 'Thu,%2001%20Dec%201994%201 6:00:00%20GMT')
434
435 expected = u'filename=Schöne Zeit.txt'
436 if six.PY2:
437 # Newer versions of python default to unicode strings, but python 2
438 # requires encoding to UTF-8 to compare the two properly
439 expected = expected.encode('utf-8')
440
441 self.assertEqual(
442 urllib.parse.unquote(check.content_disposition),
443 expected
444 )
445
446 def test_set_contents_with_sse_c(self):
447 content="01234567890123456789"
448 # the plain text of customer key is "01testKeyToSSEC!"
449 header = {
450 "x-amz-server-side-encryption-customer-algorithm" :
451 "AES256",
452 "x-amz-server-side-encryption-customer-key" :
453 "MAAxAHQAZQBzAHQASwBlAHkAVABvAFMAUwBFAEMAIQA=",
454 "x-amz-server-side-encryption-customer-key-MD5" :
455 "fUgCZDDh6bfEMuP2bN38mg=="
456 }
457 # upload and download content with AWS specified headers
458 k = self.bucket.new_key("testkey_for_sse_c")
459 k.set_contents_from_string(content, headers=header)
460 kn = self.bucket.new_key("testkey_for_sse_c")
461 ks = kn.get_contents_as_string(headers=header)
462 self.assertEqual(ks, content.encode('utf-8'))
463
464
465 class S3KeySigV4Test(unittest.TestCase):
466 def setUp(self):
467 self.conn = boto.s3.connect_to_region('eu-central-1')
468 self.bucket_name = 'boto-sigv4-key-%d' % int(time.time())
469 self.bucket = self.conn.create_bucket(self.bucket_name,
470 location='eu-central-1')
471
472 def tearDown(self):
473 for key in self.bucket:
474 key.delete()
475 self.bucket.delete()
476
477 def test_put_get_with_non_string_headers_key(self):
478 k = Key(self.bucket)
479 k.key = 'foobar'
480 body = 'This is a test of S3'
481 # A content-length header will be added to this request since it
482 # has a body.
483 k.set_contents_from_string(body)
484 # Set a header that has an integer. This checks for a bug where
485 # the sigv4 signer assumes that all of the headers are strings.
486 headers = {'Content-Length': 0}
487 from_s3_key = self.bucket.get_key('foobar', headers=headers)
488 self.assertEqual(from_s3_key.get_contents_as_string().decode('utf-8'),
489 body)
490
491
492 class S3KeyVersionCopyTest(unittest.TestCase):
493 def setUp(self):
494 self.conn = S3Connection()
495 self.bucket_name = 'boto-key-version-copy-%d' % int(time.time())
496 self.bucket = self.conn.create_bucket(self.bucket_name)
497 self.bucket.configure_versioning(True)
498
499 def tearDown(self):
500 for key in self.bucket.list_versions():
501 key.delete()
502 self.bucket.delete()
503
504 def test_key_overwrite_and_copy(self):
505 first_content = "abcdefghijklm"
506 second_content = "nopqrstuvwxyz"
507 k = Key(self.bucket, 'testkey')
508 k.set_contents_from_string(first_content)
509 # Wait for S3's eventual consistency (may not be necessary)
510 while self.bucket.get_key('testkey') is None:
511 time.sleep(5)
512 # Get the first version_id
513 first_key = self.bucket.get_key('testkey')
514 first_version_id = first_key.version_id
515 # Overwrite the key
516 k = Key(self.bucket, 'testkey')
517 k.set_contents_from_string(second_content)
518 # Wait for eventual consistency
519 while True:
520 second_key = self.bucket.get_key('testkey')
521 if second_key is None or second_key.version_id == first_version_id:
522 time.sleep(5)
523 else:
524 break
525 # Copy first key (no longer the current version) to a new key
526 source_key = self.bucket.get_key('testkey',
527 version_id=first_version_id)
528 source_key.copy(self.bucket, 'copiedkey')
529 while self.bucket.get_key('copiedkey') is None:
530 time.sleep(5)
531 copied_key = self.bucket.get_key('copiedkey')
532 copied_key_contents = copied_key.get_contents_as_string()
533 self.assertEqual(first_content, copied_key_contents)
534
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698