OLD | NEW |
(Empty) | |
| 1 # -*- coding: utf-8 -*- |
| 2 # Copyright (c) 2012 Mitch Garnaat http://garnaat.org/ |
| 3 # All rights reserved. |
| 4 # |
| 5 # Permission is hereby granted, free of charge, to any person obtaining a |
| 6 # copy of this software and associated documentation files (the |
| 7 # "Software"), to deal in the Software without restriction, including |
| 8 # without limitation the rights to use, copy, modify, merge, publish, dis- |
| 9 # tribute, sublicense, and/or sell copies of the Software, and to permit |
| 10 # persons to whom the Software is furnished to do so, subject to the fol- |
| 11 # lowing conditions: |
| 12 # |
| 13 # The above copyright notice and this permission notice shall be included |
| 14 # in all copies or substantial portions of the Software. |
| 15 # |
| 16 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
| 17 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
| 18 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
| 19 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
| 20 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 21 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| 22 # IN THE SOFTWARE. |
| 23 |
| 24 """ |
| 25 Some unit tests for S3 Key |
| 26 """ |
| 27 |
| 28 from tests.unit import unittest |
| 29 import time |
| 30 |
| 31 import boto.s3 |
| 32 from boto.compat import six, StringIO, urllib |
| 33 from boto.s3.connection import S3Connection |
| 34 from boto.s3.key import Key |
| 35 from boto.exception import S3ResponseError |
| 36 |
| 37 |
| 38 class S3KeyTest(unittest.TestCase): |
| 39 s3 = True |
| 40 |
| 41 def setUp(self): |
| 42 self.conn = S3Connection() |
| 43 self.bucket_name = 'keytest-%d' % int(time.time()) |
| 44 self.bucket = self.conn.create_bucket(self.bucket_name) |
| 45 |
| 46 def tearDown(self): |
| 47 for key in self.bucket: |
| 48 key.delete() |
| 49 self.bucket.delete() |
| 50 |
| 51 def test_set_contents_from_file_dataloss(self): |
| 52 # Create an empty stringio and write to it. |
| 53 content = "abcde" |
| 54 sfp = StringIO() |
| 55 sfp.write(content) |
| 56 # Try set_contents_from_file() without rewinding sfp |
| 57 k = self.bucket.new_key("k") |
| 58 try: |
| 59 k.set_contents_from_file(sfp) |
| 60 self.fail("forgot to rewind so should fail.") |
| 61 except AttributeError: |
| 62 pass |
| 63 # call with rewind and check if we wrote 5 bytes |
| 64 k.set_contents_from_file(sfp, rewind=True) |
| 65 self.assertEqual(k.size, 5) |
| 66 # check actual contents by getting it. |
| 67 kn = self.bucket.new_key("k") |
| 68 ks = kn.get_contents_as_string().decode('utf-8') |
| 69 self.assertEqual(ks, content) |
| 70 |
| 71 # finally, try with a 0 length string |
| 72 sfp = StringIO() |
| 73 k = self.bucket.new_key("k") |
| 74 k.set_contents_from_file(sfp) |
| 75 self.assertEqual(k.size, 0) |
| 76 # check actual contents by getting it. |
| 77 kn = self.bucket.new_key("k") |
| 78 ks = kn.get_contents_as_string().decode('utf-8') |
| 79 self.assertEqual(ks, "") |
| 80 |
| 81 def test_set_contents_as_file(self): |
| 82 content="01234567890123456789" |
| 83 sfp = StringIO(content) |
| 84 |
| 85 # fp is set at 0 for just opened (for read) files. |
| 86 # set_contents should write full content to key. |
| 87 k = self.bucket.new_key("k") |
| 88 k.set_contents_from_file(sfp) |
| 89 self.assertEqual(k.size, 20) |
| 90 kn = self.bucket.new_key("k") |
| 91 ks = kn.get_contents_as_string().decode('utf-8') |
| 92 self.assertEqual(ks, content) |
| 93 |
| 94 # set fp to 5 and set contents. this should |
| 95 # set "567890123456789" to the key |
| 96 sfp.seek(5) |
| 97 k = self.bucket.new_key("k") |
| 98 k.set_contents_from_file(sfp) |
| 99 self.assertEqual(k.size, 15) |
| 100 kn = self.bucket.new_key("k") |
| 101 ks = kn.get_contents_as_string().decode('utf-8') |
| 102 self.assertEqual(ks, content[5:]) |
| 103 |
| 104 # set fp to 5 and only set 5 bytes. this should |
| 105 # write the value "56789" to the key. |
| 106 sfp.seek(5) |
| 107 k = self.bucket.new_key("k") |
| 108 k.set_contents_from_file(sfp, size=5) |
| 109 self.assertEqual(k.size, 5) |
| 110 self.assertEqual(sfp.tell(), 10) |
| 111 kn = self.bucket.new_key("k") |
| 112 ks = kn.get_contents_as_string().decode('utf-8') |
| 113 self.assertEqual(ks, content[5:10]) |
| 114 |
| 115 def test_set_contents_with_md5(self): |
| 116 content="01234567890123456789" |
| 117 sfp = StringIO(content) |
| 118 |
| 119 # fp is set at 0 for just opened (for read) files. |
| 120 # set_contents should write full content to key. |
| 121 k = self.bucket.new_key("k") |
| 122 good_md5 = k.compute_md5(sfp) |
| 123 k.set_contents_from_file(sfp, md5=good_md5) |
| 124 kn = self.bucket.new_key("k") |
| 125 ks = kn.get_contents_as_string().decode('utf-8') |
| 126 self.assertEqual(ks, content) |
| 127 |
| 128 # set fp to 5 and only set 5 bytes. this should |
| 129 # write the value "56789" to the key. |
| 130 sfp.seek(5) |
| 131 k = self.bucket.new_key("k") |
| 132 good_md5 = k.compute_md5(sfp, size=5) |
| 133 k.set_contents_from_file(sfp, size=5, md5=good_md5) |
| 134 self.assertEqual(sfp.tell(), 10) |
| 135 kn = self.bucket.new_key("k") |
| 136 ks = kn.get_contents_as_string().decode('utf-8') |
| 137 self.assertEqual(ks, content[5:10]) |
| 138 |
| 139 # let's try a wrong md5 by just altering it. |
| 140 k = self.bucket.new_key("k") |
| 141 sfp.seek(0) |
| 142 hexdig, base64 = k.compute_md5(sfp) |
| 143 bad_md5 = (hexdig, base64[3:]) |
| 144 try: |
| 145 k.set_contents_from_file(sfp, md5=bad_md5) |
| 146 self.fail("should fail with bad md5") |
| 147 except S3ResponseError: |
| 148 pass |
| 149 |
| 150 def test_get_contents_with_md5(self): |
| 151 content="01234567890123456789" |
| 152 sfp = StringIO(content) |
| 153 |
| 154 k = self.bucket.new_key("k") |
| 155 k.set_contents_from_file(sfp) |
| 156 kn = self.bucket.new_key("k") |
| 157 s = kn.get_contents_as_string().decode('utf-8') |
| 158 self.assertEqual(kn.md5, k.md5) |
| 159 self.assertEqual(s, content) |
| 160 |
| 161 def test_file_callback(self): |
| 162 def callback(wrote, total): |
| 163 self.my_cb_cnt += 1 |
| 164 self.assertNotEqual(wrote, self.my_cb_last, "called twice with same
value") |
| 165 self.my_cb_last = wrote |
| 166 |
| 167 # Zero bytes written => 1 call |
| 168 self.my_cb_cnt = 0 |
| 169 self.my_cb_last = None |
| 170 k = self.bucket.new_key("k") |
| 171 k.BufferSize = 2 |
| 172 sfp = StringIO("") |
| 173 k.set_contents_from_file(sfp, cb=callback, num_cb=10) |
| 174 self.assertEqual(self.my_cb_cnt, 1) |
| 175 self.assertEqual(self.my_cb_last, 0) |
| 176 sfp.close() |
| 177 |
| 178 # Read back zero bytes => 1 call |
| 179 self.my_cb_cnt = 0 |
| 180 self.my_cb_last = None |
| 181 s = k.get_contents_as_string(cb=callback) |
| 182 self.assertEqual(self.my_cb_cnt, 1) |
| 183 self.assertEqual(self.my_cb_last, 0) |
| 184 |
| 185 content="01234567890123456789" |
| 186 sfp = StringIO(content) |
| 187 |
| 188 # expect 2 calls due start/finish |
| 189 self.my_cb_cnt = 0 |
| 190 self.my_cb_last = None |
| 191 k = self.bucket.new_key("k") |
| 192 k.set_contents_from_file(sfp, cb=callback, num_cb=10) |
| 193 self.assertEqual(self.my_cb_cnt, 2) |
| 194 self.assertEqual(self.my_cb_last, 20) |
| 195 |
| 196 # Read back all bytes => 2 calls |
| 197 self.my_cb_cnt = 0 |
| 198 self.my_cb_last = None |
| 199 s = k.get_contents_as_string(cb=callback).decode('utf-8') |
| 200 self.assertEqual(self.my_cb_cnt, 2) |
| 201 self.assertEqual(self.my_cb_last, 20) |
| 202 self.assertEqual(s, content) |
| 203 |
| 204 # rewind sfp and try upload again. -1 should call |
| 205 # for every read/write so that should make 11 when bs=2 |
| 206 sfp.seek(0) |
| 207 self.my_cb_cnt = 0 |
| 208 self.my_cb_last = None |
| 209 k = self.bucket.new_key("k") |
| 210 k.BufferSize = 2 |
| 211 k.set_contents_from_file(sfp, cb=callback, num_cb=-1) |
| 212 self.assertEqual(self.my_cb_cnt, 11) |
| 213 self.assertEqual(self.my_cb_last, 20) |
| 214 |
| 215 # Read back all bytes => 11 calls |
| 216 self.my_cb_cnt = 0 |
| 217 self.my_cb_last = None |
| 218 s = k.get_contents_as_string(cb=callback, num_cb=-1).decode('utf-8') |
| 219 self.assertEqual(self.my_cb_cnt, 11) |
| 220 self.assertEqual(self.my_cb_last, 20) |
| 221 self.assertEqual(s, content) |
| 222 |
| 223 # no more than 1 times => 2 times |
| 224 # last time always 20 bytes |
| 225 sfp.seek(0) |
| 226 self.my_cb_cnt = 0 |
| 227 self.my_cb_last = None |
| 228 k = self.bucket.new_key("k") |
| 229 k.BufferSize = 2 |
| 230 k.set_contents_from_file(sfp, cb=callback, num_cb=1) |
| 231 self.assertTrue(self.my_cb_cnt <= 2) |
| 232 self.assertEqual(self.my_cb_last, 20) |
| 233 |
| 234 # no more than 1 times => 2 times |
| 235 self.my_cb_cnt = 0 |
| 236 self.my_cb_last = None |
| 237 s = k.get_contents_as_string(cb=callback, num_cb=1).decode('utf-8') |
| 238 self.assertTrue(self.my_cb_cnt <= 2) |
| 239 self.assertEqual(self.my_cb_last, 20) |
| 240 self.assertEqual(s, content) |
| 241 |
| 242 # no more than 2 times |
| 243 # last time always 20 bytes |
| 244 sfp.seek(0) |
| 245 self.my_cb_cnt = 0 |
| 246 self.my_cb_last = None |
| 247 k = self.bucket.new_key("k") |
| 248 k.BufferSize = 2 |
| 249 k.set_contents_from_file(sfp, cb=callback, num_cb=2) |
| 250 self.assertTrue(self.my_cb_cnt <= 2) |
| 251 self.assertEqual(self.my_cb_last, 20) |
| 252 |
| 253 # no more than 2 times |
| 254 self.my_cb_cnt = 0 |
| 255 self.my_cb_last = None |
| 256 s = k.get_contents_as_string(cb=callback, num_cb=2).decode('utf-8') |
| 257 self.assertTrue(self.my_cb_cnt <= 2) |
| 258 self.assertEqual(self.my_cb_last, 20) |
| 259 self.assertEqual(s, content) |
| 260 |
| 261 # no more than 3 times |
| 262 # last time always 20 bytes |
| 263 sfp.seek(0) |
| 264 self.my_cb_cnt = 0 |
| 265 self.my_cb_last = None |
| 266 k = self.bucket.new_key("k") |
| 267 k.BufferSize = 2 |
| 268 k.set_contents_from_file(sfp, cb=callback, num_cb=3) |
| 269 self.assertTrue(self.my_cb_cnt <= 3) |
| 270 self.assertEqual(self.my_cb_last, 20) |
| 271 |
| 272 # no more than 3 times |
| 273 self.my_cb_cnt = 0 |
| 274 self.my_cb_last = None |
| 275 s = k.get_contents_as_string(cb=callback, num_cb=3).decode('utf-8') |
| 276 self.assertTrue(self.my_cb_cnt <= 3) |
| 277 self.assertEqual(self.my_cb_last, 20) |
| 278 self.assertEqual(s, content) |
| 279 |
| 280 # no more than 4 times |
| 281 # last time always 20 bytes |
| 282 sfp.seek(0) |
| 283 self.my_cb_cnt = 0 |
| 284 self.my_cb_last = None |
| 285 k = self.bucket.new_key("k") |
| 286 k.BufferSize = 2 |
| 287 k.set_contents_from_file(sfp, cb=callback, num_cb=4) |
| 288 self.assertTrue(self.my_cb_cnt <= 4) |
| 289 self.assertEqual(self.my_cb_last, 20) |
| 290 |
| 291 # no more than 4 times |
| 292 self.my_cb_cnt = 0 |
| 293 self.my_cb_last = None |
| 294 s = k.get_contents_as_string(cb=callback, num_cb=4).decode('utf-8') |
| 295 self.assertTrue(self.my_cb_cnt <= 4) |
| 296 self.assertEqual(self.my_cb_last, 20) |
| 297 self.assertEqual(s, content) |
| 298 |
| 299 # no more than 6 times |
| 300 # last time always 20 bytes |
| 301 sfp.seek(0) |
| 302 self.my_cb_cnt = 0 |
| 303 self.my_cb_last = None |
| 304 k = self.bucket.new_key("k") |
| 305 k.BufferSize = 2 |
| 306 k.set_contents_from_file(sfp, cb=callback, num_cb=6) |
| 307 self.assertTrue(self.my_cb_cnt <= 6) |
| 308 self.assertEqual(self.my_cb_last, 20) |
| 309 |
| 310 # no more than 6 times |
| 311 self.my_cb_cnt = 0 |
| 312 self.my_cb_last = None |
| 313 s = k.get_contents_as_string(cb=callback, num_cb=6).decode('utf-8') |
| 314 self.assertTrue(self.my_cb_cnt <= 6) |
| 315 self.assertEqual(self.my_cb_last, 20) |
| 316 self.assertEqual(s, content) |
| 317 |
| 318 # no more than 10 times |
| 319 # last time always 20 bytes |
| 320 sfp.seek(0) |
| 321 self.my_cb_cnt = 0 |
| 322 self.my_cb_last = None |
| 323 k = self.bucket.new_key("k") |
| 324 k.BufferSize = 2 |
| 325 k.set_contents_from_file(sfp, cb=callback, num_cb=10) |
| 326 self.assertTrue(self.my_cb_cnt <= 10) |
| 327 self.assertEqual(self.my_cb_last, 20) |
| 328 |
| 329 # no more than 10 times |
| 330 self.my_cb_cnt = 0 |
| 331 self.my_cb_last = None |
| 332 s = k.get_contents_as_string(cb=callback, num_cb=10).decode('utf-8') |
| 333 self.assertTrue(self.my_cb_cnt <= 10) |
| 334 self.assertEqual(self.my_cb_last, 20) |
| 335 self.assertEqual(s, content) |
| 336 |
| 337 # no more than 1000 times |
| 338 # last time always 20 bytes |
| 339 sfp.seek(0) |
| 340 self.my_cb_cnt = 0 |
| 341 self.my_cb_last = None |
| 342 k = self.bucket.new_key("k") |
| 343 k.BufferSize = 2 |
| 344 k.set_contents_from_file(sfp, cb=callback, num_cb=1000) |
| 345 self.assertTrue(self.my_cb_cnt <= 1000) |
| 346 self.assertEqual(self.my_cb_last, 20) |
| 347 |
| 348 # no more than 1000 times |
| 349 self.my_cb_cnt = 0 |
| 350 self.my_cb_last = None |
| 351 s = k.get_contents_as_string(cb=callback, num_cb=1000).decode('utf-8') |
| 352 self.assertTrue(self.my_cb_cnt <= 1000) |
| 353 self.assertEqual(self.my_cb_last, 20) |
| 354 self.assertEqual(s, content) |
| 355 |
| 356 def test_website_redirects(self): |
| 357 self.bucket.configure_website('index.html') |
| 358 key = self.bucket.new_key('redirect-key') |
| 359 self.assertTrue(key.set_redirect('http://www.amazon.com/')) |
| 360 self.assertEqual(key.get_redirect(), 'http://www.amazon.com/') |
| 361 |
| 362 self.assertTrue(key.set_redirect('http://aws.amazon.com/')) |
| 363 self.assertEqual(key.get_redirect(), 'http://aws.amazon.com/') |
| 364 |
| 365 def test_website_redirect_none_configured(self): |
| 366 key = self.bucket.new_key('redirect-key') |
| 367 key.set_contents_from_string('') |
| 368 self.assertEqual(key.get_redirect(), None) |
| 369 |
| 370 def test_website_redirect_with_bad_value(self): |
| 371 self.bucket.configure_website('index.html') |
| 372 key = self.bucket.new_key('redirect-key') |
| 373 with self.assertRaises(key.provider.storage_response_error): |
| 374 # Must start with a / or http |
| 375 key.set_redirect('ftp://ftp.example.org') |
| 376 with self.assertRaises(key.provider.storage_response_error): |
| 377 # Must start with a / or http |
| 378 key.set_redirect('') |
| 379 |
| 380 def test_setting_date(self): |
| 381 key = self.bucket.new_key('test_date') |
| 382 # This should actually set x-amz-meta-date & not fail miserably. |
| 383 key.set_metadata('date', '20130524T155935Z') |
| 384 key.set_contents_from_string('Some text here.') |
| 385 |
| 386 check = self.bucket.get_key('test_date') |
| 387 self.assertEqual(check.get_metadata('date'), u'20130524T155935Z') |
| 388 self.assertTrue('x-amz-meta-date' in check._get_remote_metadata()) |
| 389 |
| 390 def test_header_casing(self): |
| 391 key = self.bucket.new_key('test_header_case') |
| 392 # Using anything but CamelCase on ``Content-Type`` or ``Content-MD5`` |
| 393 # used to cause a signature error (when using ``s3`` for signing). |
| 394 key.set_metadata('Content-type', 'application/json') |
| 395 key.set_metadata('Content-md5', 'XmUKnus7svY1frWsVskxXg==') |
| 396 key.set_contents_from_string('{"abc": 123}') |
| 397 |
| 398 check = self.bucket.get_key('test_header_case') |
| 399 self.assertEqual(check.content_type, 'application/json') |
| 400 |
| 401 def test_header_encoding(self): |
| 402 key = self.bucket.new_key('test_header_encoding') |
| 403 |
| 404 key.set_metadata('Cache-control', u'public, max-age=500') |
| 405 key.set_metadata('Test-Plus', u'A plus (+)') |
| 406 key.set_metadata('Content-disposition', u'filename=Schöne Zeit.txt') |
| 407 key.set_metadata('Content-Encoding', 'gzip') |
| 408 key.set_metadata('Content-Language', 'de') |
| 409 key.set_metadata('Content-Type', 'application/pdf') |
| 410 self.assertEqual(key.content_type, 'application/pdf') |
| 411 key.set_metadata('X-Robots-Tag', 'all') |
| 412 key.set_metadata('Expires', u'Thu, 01 Dec 1994 16:00:00 GMT') |
| 413 key.set_contents_from_string('foo') |
| 414 |
| 415 check = self.bucket.get_key('test_header_encoding') |
| 416 remote_metadata = check._get_remote_metadata() |
| 417 |
| 418 # TODO: investigate whether encoding ' ' as '%20' makes sense |
| 419 self.assertEqual(check.cache_control, 'public,%20max-age=500') |
| 420 self.assertEqual(remote_metadata['cache-control'], 'public,%20max-age=50
0') |
| 421 self.assertEqual(check.get_metadata('test-plus'), 'A plus (+)') |
| 422 self.assertEqual(check.content_disposition, 'filename=Sch%C3%B6ne%20Zeit
.txt') |
| 423 self.assertEqual(remote_metadata['content-disposition'], 'filename=Sch%C
3%B6ne%20Zeit.txt') |
| 424 self.assertEqual(check.content_encoding, 'gzip') |
| 425 self.assertEqual(remote_metadata['content-encoding'], 'gzip') |
| 426 self.assertEqual(check.content_language, 'de') |
| 427 self.assertEqual(remote_metadata['content-language'], 'de') |
| 428 self.assertEqual(check.content_type, 'application/pdf') |
| 429 self.assertEqual(remote_metadata['content-type'], 'application/pdf') |
| 430 self.assertEqual(check.x_robots_tag, 'all') |
| 431 self.assertEqual(remote_metadata['x-robots-tag'], 'all') |
| 432 self.assertEqual(check.expires, 'Thu,%2001%20Dec%201994%2016:00:00%20GMT
') |
| 433 self.assertEqual(remote_metadata['expires'], 'Thu,%2001%20Dec%201994%201
6:00:00%20GMT') |
| 434 |
| 435 expected = u'filename=Schöne Zeit.txt' |
| 436 if six.PY2: |
| 437 # Newer versions of python default to unicode strings, but python 2 |
| 438 # requires encoding to UTF-8 to compare the two properly |
| 439 expected = expected.encode('utf-8') |
| 440 |
| 441 self.assertEqual( |
| 442 urllib.parse.unquote(check.content_disposition), |
| 443 expected |
| 444 ) |
| 445 |
| 446 def test_set_contents_with_sse_c(self): |
| 447 content="01234567890123456789" |
| 448 # the plain text of customer key is "01testKeyToSSEC!" |
| 449 header = { |
| 450 "x-amz-server-side-encryption-customer-algorithm" : |
| 451 "AES256", |
| 452 "x-amz-server-side-encryption-customer-key" : |
| 453 "MAAxAHQAZQBzAHQASwBlAHkAVABvAFMAUwBFAEMAIQA=", |
| 454 "x-amz-server-side-encryption-customer-key-MD5" : |
| 455 "fUgCZDDh6bfEMuP2bN38mg==" |
| 456 } |
| 457 # upload and download content with AWS specified headers |
| 458 k = self.bucket.new_key("testkey_for_sse_c") |
| 459 k.set_contents_from_string(content, headers=header) |
| 460 kn = self.bucket.new_key("testkey_for_sse_c") |
| 461 ks = kn.get_contents_as_string(headers=header) |
| 462 self.assertEqual(ks, content.encode('utf-8')) |
| 463 |
| 464 |
| 465 class S3KeySigV4Test(unittest.TestCase): |
| 466 def setUp(self): |
| 467 self.conn = boto.s3.connect_to_region('eu-central-1') |
| 468 self.bucket_name = 'boto-sigv4-key-%d' % int(time.time()) |
| 469 self.bucket = self.conn.create_bucket(self.bucket_name, |
| 470 location='eu-central-1') |
| 471 |
| 472 def tearDown(self): |
| 473 for key in self.bucket: |
| 474 key.delete() |
| 475 self.bucket.delete() |
| 476 |
| 477 def test_put_get_with_non_string_headers_key(self): |
| 478 k = Key(self.bucket) |
| 479 k.key = 'foobar' |
| 480 body = 'This is a test of S3' |
| 481 # A content-length header will be added to this request since it |
| 482 # has a body. |
| 483 k.set_contents_from_string(body) |
| 484 # Set a header that has an integer. This checks for a bug where |
| 485 # the sigv4 signer assumes that all of the headers are strings. |
| 486 headers = {'Content-Length': 0} |
| 487 from_s3_key = self.bucket.get_key('foobar', headers=headers) |
| 488 self.assertEqual(from_s3_key.get_contents_as_string().decode('utf-8'), |
| 489 body) |
| 490 |
| 491 |
| 492 class S3KeyVersionCopyTest(unittest.TestCase): |
| 493 def setUp(self): |
| 494 self.conn = S3Connection() |
| 495 self.bucket_name = 'boto-key-version-copy-%d' % int(time.time()) |
| 496 self.bucket = self.conn.create_bucket(self.bucket_name) |
| 497 self.bucket.configure_versioning(True) |
| 498 |
| 499 def tearDown(self): |
| 500 for key in self.bucket.list_versions(): |
| 501 key.delete() |
| 502 self.bucket.delete() |
| 503 |
| 504 def test_key_overwrite_and_copy(self): |
| 505 first_content = "abcdefghijklm" |
| 506 second_content = "nopqrstuvwxyz" |
| 507 k = Key(self.bucket, 'testkey') |
| 508 k.set_contents_from_string(first_content) |
| 509 # Wait for S3's eventual consistency (may not be necessary) |
| 510 while self.bucket.get_key('testkey') is None: |
| 511 time.sleep(5) |
| 512 # Get the first version_id |
| 513 first_key = self.bucket.get_key('testkey') |
| 514 first_version_id = first_key.version_id |
| 515 # Overwrite the key |
| 516 k = Key(self.bucket, 'testkey') |
| 517 k.set_contents_from_string(second_content) |
| 518 # Wait for eventual consistency |
| 519 while True: |
| 520 second_key = self.bucket.get_key('testkey') |
| 521 if second_key is None or second_key.version_id == first_version_id: |
| 522 time.sleep(5) |
| 523 else: |
| 524 break |
| 525 # Copy first key (no longer the current version) to a new key |
| 526 source_key = self.bucket.get_key('testkey', |
| 527 version_id=first_version_id) |
| 528 source_key.copy(self.bucket, 'copiedkey') |
| 529 while self.bucket.get_key('copiedkey') is None: |
| 530 time.sleep(5) |
| 531 copied_key = self.bucket.get_key('copiedkey') |
| 532 copied_key_contents = copied_key.get_contents_as_string() |
| 533 self.assertEqual(first_content, copied_key_contents) |
| 534 |
OLD | NEW |