| Index: tools/telemetry/third_party/gsutilz/third_party/boto/tests/integration/s3/test_key.py
|
| diff --git a/tools/telemetry/third_party/gsutilz/third_party/boto/tests/integration/s3/test_key.py b/tools/telemetry/third_party/gsutilz/third_party/boto/tests/integration/s3/test_key.py
|
| deleted file mode 100644
|
| index 8d426a267b8daa9ded2a22ef7e71d1f7cc5102c6..0000000000000000000000000000000000000000
|
| --- a/tools/telemetry/third_party/gsutilz/third_party/boto/tests/integration/s3/test_key.py
|
| +++ /dev/null
|
| @@ -1,534 +0,0 @@
|
| -# -*- coding: utf-8 -*-
|
| -# Copyright (c) 2012 Mitch Garnaat http://garnaat.org/
|
| -# All rights reserved.
|
| -#
|
| -# Permission is hereby granted, free of charge, to any person obtaining a
|
| -# copy of this software and associated documentation files (the
|
| -# "Software"), to deal in the Software without restriction, including
|
| -# without limitation the rights to use, copy, modify, merge, publish, dis-
|
| -# tribute, sublicense, and/or sell copies of the Software, and to permit
|
| -# persons to whom the Software is furnished to do so, subject to the fol-
|
| -# lowing conditions:
|
| -#
|
| -# The above copyright notice and this permission notice shall be included
|
| -# in all copies or substantial portions of the Software.
|
| -#
|
| -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
| -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
|
| -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
|
| -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
| -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
| -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
| -# IN THE SOFTWARE.
|
| -
|
| -"""
|
| -Some unit tests for S3 Key
|
| -"""
|
| -
|
| -from tests.unit import unittest
|
| -import time
|
| -
|
| -import boto.s3
|
| -from boto.compat import six, StringIO, urllib
|
| -from boto.s3.connection import S3Connection
|
| -from boto.s3.key import Key
|
| -from boto.exception import S3ResponseError
|
| -
|
| -
|
| -class S3KeyTest(unittest.TestCase):
|
| - s3 = True
|
| -
|
| - def setUp(self):
|
| - self.conn = S3Connection()
|
| - self.bucket_name = 'keytest-%d' % int(time.time())
|
| - self.bucket = self.conn.create_bucket(self.bucket_name)
|
| -
|
| - def tearDown(self):
|
| - for key in self.bucket:
|
| - key.delete()
|
| - self.bucket.delete()
|
| -
|
| - def test_set_contents_from_file_dataloss(self):
|
| - # Create an empty stringio and write to it.
|
| - content = "abcde"
|
| - sfp = StringIO()
|
| - sfp.write(content)
|
| - # Try set_contents_from_file() without rewinding sfp
|
| - k = self.bucket.new_key("k")
|
| - try:
|
| - k.set_contents_from_file(sfp)
|
| - self.fail("forgot to rewind so should fail.")
|
| - except AttributeError:
|
| - pass
|
| - # call with rewind and check if we wrote 5 bytes
|
| - k.set_contents_from_file(sfp, rewind=True)
|
| - self.assertEqual(k.size, 5)
|
| - # check actual contents by getting it.
|
| - kn = self.bucket.new_key("k")
|
| - ks = kn.get_contents_as_string().decode('utf-8')
|
| - self.assertEqual(ks, content)
|
| -
|
| - # finally, try with a 0 length string
|
| - sfp = StringIO()
|
| - k = self.bucket.new_key("k")
|
| - k.set_contents_from_file(sfp)
|
| - self.assertEqual(k.size, 0)
|
| - # check actual contents by getting it.
|
| - kn = self.bucket.new_key("k")
|
| - ks = kn.get_contents_as_string().decode('utf-8')
|
| - self.assertEqual(ks, "")
|
| -
|
| - def test_set_contents_as_file(self):
|
| - content="01234567890123456789"
|
| - sfp = StringIO(content)
|
| -
|
| - # fp is set at 0 for just opened (for read) files.
|
| - # set_contents should write full content to key.
|
| - k = self.bucket.new_key("k")
|
| - k.set_contents_from_file(sfp)
|
| - self.assertEqual(k.size, 20)
|
| - kn = self.bucket.new_key("k")
|
| - ks = kn.get_contents_as_string().decode('utf-8')
|
| - self.assertEqual(ks, content)
|
| -
|
| - # set fp to 5 and set contents. this should
|
| - # set "567890123456789" to the key
|
| - sfp.seek(5)
|
| - k = self.bucket.new_key("k")
|
| - k.set_contents_from_file(sfp)
|
| - self.assertEqual(k.size, 15)
|
| - kn = self.bucket.new_key("k")
|
| - ks = kn.get_contents_as_string().decode('utf-8')
|
| - self.assertEqual(ks, content[5:])
|
| -
|
| - # set fp to 5 and only set 5 bytes. this should
|
| - # write the value "56789" to the key.
|
| - sfp.seek(5)
|
| - k = self.bucket.new_key("k")
|
| - k.set_contents_from_file(sfp, size=5)
|
| - self.assertEqual(k.size, 5)
|
| - self.assertEqual(sfp.tell(), 10)
|
| - kn = self.bucket.new_key("k")
|
| - ks = kn.get_contents_as_string().decode('utf-8')
|
| - self.assertEqual(ks, content[5:10])
|
| -
|
| - def test_set_contents_with_md5(self):
|
| - content="01234567890123456789"
|
| - sfp = StringIO(content)
|
| -
|
| - # fp is set at 0 for just opened (for read) files.
|
| - # set_contents should write full content to key.
|
| - k = self.bucket.new_key("k")
|
| - good_md5 = k.compute_md5(sfp)
|
| - k.set_contents_from_file(sfp, md5=good_md5)
|
| - kn = self.bucket.new_key("k")
|
| - ks = kn.get_contents_as_string().decode('utf-8')
|
| - self.assertEqual(ks, content)
|
| -
|
| - # set fp to 5 and only set 5 bytes. this should
|
| - # write the value "56789" to the key.
|
| - sfp.seek(5)
|
| - k = self.bucket.new_key("k")
|
| - good_md5 = k.compute_md5(sfp, size=5)
|
| - k.set_contents_from_file(sfp, size=5, md5=good_md5)
|
| - self.assertEqual(sfp.tell(), 10)
|
| - kn = self.bucket.new_key("k")
|
| - ks = kn.get_contents_as_string().decode('utf-8')
|
| - self.assertEqual(ks, content[5:10])
|
| -
|
| - # let's try a wrong md5 by just altering it.
|
| - k = self.bucket.new_key("k")
|
| - sfp.seek(0)
|
| - hexdig, base64 = k.compute_md5(sfp)
|
| - bad_md5 = (hexdig, base64[3:])
|
| - try:
|
| - k.set_contents_from_file(sfp, md5=bad_md5)
|
| - self.fail("should fail with bad md5")
|
| - except S3ResponseError:
|
| - pass
|
| -
|
| - def test_get_contents_with_md5(self):
|
| - content="01234567890123456789"
|
| - sfp = StringIO(content)
|
| -
|
| - k = self.bucket.new_key("k")
|
| - k.set_contents_from_file(sfp)
|
| - kn = self.bucket.new_key("k")
|
| - s = kn.get_contents_as_string().decode('utf-8')
|
| - self.assertEqual(kn.md5, k.md5)
|
| - self.assertEqual(s, content)
|
| -
|
| - def test_file_callback(self):
|
| - def callback(wrote, total):
|
| - self.my_cb_cnt += 1
|
| - self.assertNotEqual(wrote, self.my_cb_last, "called twice with same value")
|
| - self.my_cb_last = wrote
|
| -
|
| - # Zero bytes written => 1 call
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - k = self.bucket.new_key("k")
|
| - k.BufferSize = 2
|
| - sfp = StringIO("")
|
| - k.set_contents_from_file(sfp, cb=callback, num_cb=10)
|
| - self.assertEqual(self.my_cb_cnt, 1)
|
| - self.assertEqual(self.my_cb_last, 0)
|
| - sfp.close()
|
| -
|
| - # Read back zero bytes => 1 call
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - s = k.get_contents_as_string(cb=callback)
|
| - self.assertEqual(self.my_cb_cnt, 1)
|
| - self.assertEqual(self.my_cb_last, 0)
|
| -
|
| - content="01234567890123456789"
|
| - sfp = StringIO(content)
|
| -
|
| - # expect 2 calls due start/finish
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - k = self.bucket.new_key("k")
|
| - k.set_contents_from_file(sfp, cb=callback, num_cb=10)
|
| - self.assertEqual(self.my_cb_cnt, 2)
|
| - self.assertEqual(self.my_cb_last, 20)
|
| -
|
| - # Read back all bytes => 2 calls
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - s = k.get_contents_as_string(cb=callback).decode('utf-8')
|
| - self.assertEqual(self.my_cb_cnt, 2)
|
| - self.assertEqual(self.my_cb_last, 20)
|
| - self.assertEqual(s, content)
|
| -
|
| - # rewind sfp and try upload again. -1 should call
|
| - # for every read/write so that should make 11 when bs=2
|
| - sfp.seek(0)
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - k = self.bucket.new_key("k")
|
| - k.BufferSize = 2
|
| - k.set_contents_from_file(sfp, cb=callback, num_cb=-1)
|
| - self.assertEqual(self.my_cb_cnt, 11)
|
| - self.assertEqual(self.my_cb_last, 20)
|
| -
|
| - # Read back all bytes => 11 calls
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - s = k.get_contents_as_string(cb=callback, num_cb=-1).decode('utf-8')
|
| - self.assertEqual(self.my_cb_cnt, 11)
|
| - self.assertEqual(self.my_cb_last, 20)
|
| - self.assertEqual(s, content)
|
| -
|
| - # no more than 1 times => 2 times
|
| - # last time always 20 bytes
|
| - sfp.seek(0)
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - k = self.bucket.new_key("k")
|
| - k.BufferSize = 2
|
| - k.set_contents_from_file(sfp, cb=callback, num_cb=1)
|
| - self.assertTrue(self.my_cb_cnt <= 2)
|
| - self.assertEqual(self.my_cb_last, 20)
|
| -
|
| - # no more than 1 times => 2 times
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - s = k.get_contents_as_string(cb=callback, num_cb=1).decode('utf-8')
|
| - self.assertTrue(self.my_cb_cnt <= 2)
|
| - self.assertEqual(self.my_cb_last, 20)
|
| - self.assertEqual(s, content)
|
| -
|
| - # no more than 2 times
|
| - # last time always 20 bytes
|
| - sfp.seek(0)
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - k = self.bucket.new_key("k")
|
| - k.BufferSize = 2
|
| - k.set_contents_from_file(sfp, cb=callback, num_cb=2)
|
| - self.assertTrue(self.my_cb_cnt <= 2)
|
| - self.assertEqual(self.my_cb_last, 20)
|
| -
|
| - # no more than 2 times
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - s = k.get_contents_as_string(cb=callback, num_cb=2).decode('utf-8')
|
| - self.assertTrue(self.my_cb_cnt <= 2)
|
| - self.assertEqual(self.my_cb_last, 20)
|
| - self.assertEqual(s, content)
|
| -
|
| - # no more than 3 times
|
| - # last time always 20 bytes
|
| - sfp.seek(0)
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - k = self.bucket.new_key("k")
|
| - k.BufferSize = 2
|
| - k.set_contents_from_file(sfp, cb=callback, num_cb=3)
|
| - self.assertTrue(self.my_cb_cnt <= 3)
|
| - self.assertEqual(self.my_cb_last, 20)
|
| -
|
| - # no more than 3 times
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - s = k.get_contents_as_string(cb=callback, num_cb=3).decode('utf-8')
|
| - self.assertTrue(self.my_cb_cnt <= 3)
|
| - self.assertEqual(self.my_cb_last, 20)
|
| - self.assertEqual(s, content)
|
| -
|
| - # no more than 4 times
|
| - # last time always 20 bytes
|
| - sfp.seek(0)
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - k = self.bucket.new_key("k")
|
| - k.BufferSize = 2
|
| - k.set_contents_from_file(sfp, cb=callback, num_cb=4)
|
| - self.assertTrue(self.my_cb_cnt <= 4)
|
| - self.assertEqual(self.my_cb_last, 20)
|
| -
|
| - # no more than 4 times
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - s = k.get_contents_as_string(cb=callback, num_cb=4).decode('utf-8')
|
| - self.assertTrue(self.my_cb_cnt <= 4)
|
| - self.assertEqual(self.my_cb_last, 20)
|
| - self.assertEqual(s, content)
|
| -
|
| - # no more than 6 times
|
| - # last time always 20 bytes
|
| - sfp.seek(0)
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - k = self.bucket.new_key("k")
|
| - k.BufferSize = 2
|
| - k.set_contents_from_file(sfp, cb=callback, num_cb=6)
|
| - self.assertTrue(self.my_cb_cnt <= 6)
|
| - self.assertEqual(self.my_cb_last, 20)
|
| -
|
| - # no more than 6 times
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - s = k.get_contents_as_string(cb=callback, num_cb=6).decode('utf-8')
|
| - self.assertTrue(self.my_cb_cnt <= 6)
|
| - self.assertEqual(self.my_cb_last, 20)
|
| - self.assertEqual(s, content)
|
| -
|
| - # no more than 10 times
|
| - # last time always 20 bytes
|
| - sfp.seek(0)
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - k = self.bucket.new_key("k")
|
| - k.BufferSize = 2
|
| - k.set_contents_from_file(sfp, cb=callback, num_cb=10)
|
| - self.assertTrue(self.my_cb_cnt <= 10)
|
| - self.assertEqual(self.my_cb_last, 20)
|
| -
|
| - # no more than 10 times
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - s = k.get_contents_as_string(cb=callback, num_cb=10).decode('utf-8')
|
| - self.assertTrue(self.my_cb_cnt <= 10)
|
| - self.assertEqual(self.my_cb_last, 20)
|
| - self.assertEqual(s, content)
|
| -
|
| - # no more than 1000 times
|
| - # last time always 20 bytes
|
| - sfp.seek(0)
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - k = self.bucket.new_key("k")
|
| - k.BufferSize = 2
|
| - k.set_contents_from_file(sfp, cb=callback, num_cb=1000)
|
| - self.assertTrue(self.my_cb_cnt <= 1000)
|
| - self.assertEqual(self.my_cb_last, 20)
|
| -
|
| - # no more than 1000 times
|
| - self.my_cb_cnt = 0
|
| - self.my_cb_last = None
|
| - s = k.get_contents_as_string(cb=callback, num_cb=1000).decode('utf-8')
|
| - self.assertTrue(self.my_cb_cnt <= 1000)
|
| - self.assertEqual(self.my_cb_last, 20)
|
| - self.assertEqual(s, content)
|
| -
|
| - def test_website_redirects(self):
|
| - self.bucket.configure_website('index.html')
|
| - key = self.bucket.new_key('redirect-key')
|
| - self.assertTrue(key.set_redirect('http://www.amazon.com/'))
|
| - self.assertEqual(key.get_redirect(), 'http://www.amazon.com/')
|
| -
|
| - self.assertTrue(key.set_redirect('http://aws.amazon.com/'))
|
| - self.assertEqual(key.get_redirect(), 'http://aws.amazon.com/')
|
| -
|
| - def test_website_redirect_none_configured(self):
|
| - key = self.bucket.new_key('redirect-key')
|
| - key.set_contents_from_string('')
|
| - self.assertEqual(key.get_redirect(), None)
|
| -
|
| - def test_website_redirect_with_bad_value(self):
|
| - self.bucket.configure_website('index.html')
|
| - key = self.bucket.new_key('redirect-key')
|
| - with self.assertRaises(key.provider.storage_response_error):
|
| - # Must start with a / or http
|
| - key.set_redirect('ftp://ftp.example.org')
|
| - with self.assertRaises(key.provider.storage_response_error):
|
| - # Must start with a / or http
|
| - key.set_redirect('')
|
| -
|
| - def test_setting_date(self):
|
| - key = self.bucket.new_key('test_date')
|
| - # This should actually set x-amz-meta-date & not fail miserably.
|
| - key.set_metadata('date', '20130524T155935Z')
|
| - key.set_contents_from_string('Some text here.')
|
| -
|
| - check = self.bucket.get_key('test_date')
|
| - self.assertEqual(check.get_metadata('date'), u'20130524T155935Z')
|
| - self.assertTrue('x-amz-meta-date' in check._get_remote_metadata())
|
| -
|
| - def test_header_casing(self):
|
| - key = self.bucket.new_key('test_header_case')
|
| - # Using anything but CamelCase on ``Content-Type`` or ``Content-MD5``
|
| - # used to cause a signature error (when using ``s3`` for signing).
|
| - key.set_metadata('Content-type', 'application/json')
|
| - key.set_metadata('Content-md5', 'XmUKnus7svY1frWsVskxXg==')
|
| - key.set_contents_from_string('{"abc": 123}')
|
| -
|
| - check = self.bucket.get_key('test_header_case')
|
| - self.assertEqual(check.content_type, 'application/json')
|
| -
|
| - def test_header_encoding(self):
|
| - key = self.bucket.new_key('test_header_encoding')
|
| -
|
| - key.set_metadata('Cache-control', u'public, max-age=500')
|
| - key.set_metadata('Test-Plus', u'A plus (+)')
|
| - key.set_metadata('Content-disposition', u'filename=Schöne Zeit.txt')
|
| - key.set_metadata('Content-Encoding', 'gzip')
|
| - key.set_metadata('Content-Language', 'de')
|
| - key.set_metadata('Content-Type', 'application/pdf')
|
| - self.assertEqual(key.content_type, 'application/pdf')
|
| - key.set_metadata('X-Robots-Tag', 'all')
|
| - key.set_metadata('Expires', u'Thu, 01 Dec 1994 16:00:00 GMT')
|
| - key.set_contents_from_string('foo')
|
| -
|
| - check = self.bucket.get_key('test_header_encoding')
|
| - remote_metadata = check._get_remote_metadata()
|
| -
|
| - # TODO: investigate whether encoding ' ' as '%20' makes sense
|
| - self.assertEqual(check.cache_control, 'public,%20max-age=500')
|
| - self.assertEqual(remote_metadata['cache-control'], 'public,%20max-age=500')
|
| - self.assertEqual(check.get_metadata('test-plus'), 'A plus (+)')
|
| - self.assertEqual(check.content_disposition, 'filename=Sch%C3%B6ne%20Zeit.txt')
|
| - self.assertEqual(remote_metadata['content-disposition'], 'filename=Sch%C3%B6ne%20Zeit.txt')
|
| - self.assertEqual(check.content_encoding, 'gzip')
|
| - self.assertEqual(remote_metadata['content-encoding'], 'gzip')
|
| - self.assertEqual(check.content_language, 'de')
|
| - self.assertEqual(remote_metadata['content-language'], 'de')
|
| - self.assertEqual(check.content_type, 'application/pdf')
|
| - self.assertEqual(remote_metadata['content-type'], 'application/pdf')
|
| - self.assertEqual(check.x_robots_tag, 'all')
|
| - self.assertEqual(remote_metadata['x-robots-tag'], 'all')
|
| - self.assertEqual(check.expires, 'Thu,%2001%20Dec%201994%2016:00:00%20GMT')
|
| - self.assertEqual(remote_metadata['expires'], 'Thu,%2001%20Dec%201994%2016:00:00%20GMT')
|
| -
|
| - expected = u'filename=Schöne Zeit.txt'
|
| - if six.PY2:
|
| - # Newer versions of python default to unicode strings, but python 2
|
| - # requires encoding to UTF-8 to compare the two properly
|
| - expected = expected.encode('utf-8')
|
| -
|
| - self.assertEqual(
|
| - urllib.parse.unquote(check.content_disposition),
|
| - expected
|
| - )
|
| -
|
| - def test_set_contents_with_sse_c(self):
|
| - content="01234567890123456789"
|
| - # the plain text of customer key is "01testKeyToSSEC!"
|
| - header = {
|
| - "x-amz-server-side-encryption-customer-algorithm" :
|
| - "AES256",
|
| - "x-amz-server-side-encryption-customer-key" :
|
| - "MAAxAHQAZQBzAHQASwBlAHkAVABvAFMAUwBFAEMAIQA=",
|
| - "x-amz-server-side-encryption-customer-key-MD5" :
|
| - "fUgCZDDh6bfEMuP2bN38mg=="
|
| - }
|
| - # upload and download content with AWS specified headers
|
| - k = self.bucket.new_key("testkey_for_sse_c")
|
| - k.set_contents_from_string(content, headers=header)
|
| - kn = self.bucket.new_key("testkey_for_sse_c")
|
| - ks = kn.get_contents_as_string(headers=header)
|
| - self.assertEqual(ks, content.encode('utf-8'))
|
| -
|
| -
|
| -class S3KeySigV4Test(unittest.TestCase):
|
| - def setUp(self):
|
| - self.conn = boto.s3.connect_to_region('eu-central-1')
|
| - self.bucket_name = 'boto-sigv4-key-%d' % int(time.time())
|
| - self.bucket = self.conn.create_bucket(self.bucket_name,
|
| - location='eu-central-1')
|
| -
|
| - def tearDown(self):
|
| - for key in self.bucket:
|
| - key.delete()
|
| - self.bucket.delete()
|
| -
|
| - def test_put_get_with_non_string_headers_key(self):
|
| - k = Key(self.bucket)
|
| - k.key = 'foobar'
|
| - body = 'This is a test of S3'
|
| - # A content-length header will be added to this request since it
|
| - # has a body.
|
| - k.set_contents_from_string(body)
|
| - # Set a header that has an integer. This checks for a bug where
|
| - # the sigv4 signer assumes that all of the headers are strings.
|
| - headers = {'Content-Length': 0}
|
| - from_s3_key = self.bucket.get_key('foobar', headers=headers)
|
| - self.assertEqual(from_s3_key.get_contents_as_string().decode('utf-8'),
|
| - body)
|
| -
|
| -
|
| -class S3KeyVersionCopyTest(unittest.TestCase):
|
| - def setUp(self):
|
| - self.conn = S3Connection()
|
| - self.bucket_name = 'boto-key-version-copy-%d' % int(time.time())
|
| - self.bucket = self.conn.create_bucket(self.bucket_name)
|
| - self.bucket.configure_versioning(True)
|
| -
|
| - def tearDown(self):
|
| - for key in self.bucket.list_versions():
|
| - key.delete()
|
| - self.bucket.delete()
|
| -
|
| - def test_key_overwrite_and_copy(self):
|
| - first_content = "abcdefghijklm"
|
| - second_content = "nopqrstuvwxyz"
|
| - k = Key(self.bucket, 'testkey')
|
| - k.set_contents_from_string(first_content)
|
| - # Wait for S3's eventual consistency (may not be necessary)
|
| - while self.bucket.get_key('testkey') is None:
|
| - time.sleep(5)
|
| - # Get the first version_id
|
| - first_key = self.bucket.get_key('testkey')
|
| - first_version_id = first_key.version_id
|
| - # Overwrite the key
|
| - k = Key(self.bucket, 'testkey')
|
| - k.set_contents_from_string(second_content)
|
| - # Wait for eventual consistency
|
| - while True:
|
| - second_key = self.bucket.get_key('testkey')
|
| - if second_key is None or second_key.version_id == first_version_id:
|
| - time.sleep(5)
|
| - else:
|
| - break
|
| - # Copy first key (no longer the current version) to a new key
|
| - source_key = self.bucket.get_key('testkey',
|
| - version_id=first_version_id)
|
| - source_key.copy(self.bucket, 'copiedkey')
|
| - while self.bucket.get_key('copiedkey') is None:
|
| - time.sleep(5)
|
| - copied_key = self.bucket.get_key('copiedkey')
|
| - copied_key_contents = copied_key.get_contents_as_string()
|
| - self.assertEqual(first_content, copied_key_contents)
|
| -
|
|
|