| Index: tests/s3/test_gsconnection.py
|
| diff --git a/tests/s3/test_gsconnection.py b/tests/s3/test_gsconnection.py
|
| index e38f442165eee500053db1ec1bf3233b1e374bd4..c8c6d585fee9588a31a736f127ff6837735bc46a 100644
|
| --- a/tests/s3/test_gsconnection.py
|
| +++ b/tests/s3/test_gsconnection.py
|
| @@ -31,10 +31,12 @@ import unittest
|
| import time
|
| import os
|
| from boto.gs.connection import GSConnection
|
| +from boto import storage_uri
|
|
|
| class GSConnectionTest (unittest.TestCase):
|
|
|
| def test_1_basic(self):
|
| + """basic regression test for Google Cloud Storage"""
|
| print '--- running GSConnection tests ---'
|
| c = GSConnection()
|
| # create a new, empty bucket
|
| @@ -176,4 +178,110 @@ class GSConnectionTest (unittest.TestCase):
|
| # now delete bucket
|
| time.sleep(5)
|
| c.delete_bucket(bucket)
|
| +
|
| + def test_2_copy_key(self):
|
| + """test copying a key from one bucket to another"""
|
| + c = GSConnection()
|
| + # create two new, empty buckets
|
| + bucket_name_1 = 'test1-%d' % int(time.time())
|
| + bucket_name_2 = 'test2-%d' % int(time.time())
|
| + bucket1 = c.create_bucket(bucket_name_1)
|
| + bucket2 = c.create_bucket(bucket_name_2)
|
| + # verify buckets got created
|
| + bucket1 = c.get_bucket(bucket_name_1)
|
| + bucket2 = c.get_bucket(bucket_name_2)
|
| + # create a key in bucket1 and give it some content
|
| + k1 = bucket1.new_key()
|
| + assert isinstance(k1, bucket1.key_class)
|
| + key_name = 'foobar'
|
| + k1.name = key_name
|
| + s = 'This is a test.'
|
| + k1.set_contents_from_string(s)
|
| + # copy the new key from bucket1 to bucket2
|
| + k1.copy(bucket_name_2, key_name)
|
| + # now copy the contents from bucket2 to a local file
|
| + k2 = bucket2.lookup(key_name)
|
| + assert isinstance(k2, bucket2.key_class)
|
| + fp = open('foobar', 'wb')
|
| + k2.get_contents_to_file(fp)
|
| + fp.close()
|
| + fp = open('foobar')
|
| + # check to make sure content read is identical to original
|
| + assert s == fp.read(), 'move test failed!'
|
| + fp.close()
|
| + # delete keys
|
| + bucket1.delete_key(k1)
|
| + bucket2.delete_key(k2)
|
| + # delete test buckets
|
| + c.delete_bucket(bucket1)
|
| + c.delete_bucket(bucket2)
|
| +
|
| + def test_3_default_object_acls(self):
|
| + """test default object acls"""
|
| + c = GSConnection()
|
| + # create a new bucket
|
| + bucket_name = 'test-%d' % int(time.time())
|
| + bucket = c.create_bucket(bucket_name)
|
| + # now call get_bucket to see if it's really there
|
| + bucket = c.get_bucket(bucket_name)
|
| + # get default acl and make sure it's empty
|
| + acl = bucket.get_def_acl()
|
| + assert acl.to_xml() == '<AccessControlList></AccessControlList>'
|
| + # set default acl to a canned acl and verify it gets set
|
| + bucket.set_def_acl('public-read')
|
| + acl = bucket.get_def_acl()
|
| + # save public-read acl for later test
|
| + public_read_acl = acl
|
| + assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' +
|
| + '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' +
|
| + '</Entry></Entries></AccessControlList>')
|
| + # back to private acl
|
| + bucket.set_def_acl('private')
|
| + acl = bucket.get_def_acl()
|
| + assert acl.to_xml() == '<AccessControlList></AccessControlList>'
|
| + # set default acl to an xml acl and verify it gets set
|
| + bucket.set_def_acl(public_read_acl)
|
| + acl = bucket.get_def_acl()
|
| + assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' +
|
| + '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' +
|
| + '</Entry></Entries></AccessControlList>')
|
| + # back to private acl
|
| + bucket.set_def_acl('private')
|
| + acl = bucket.get_def_acl()
|
| + assert acl.to_xml() == '<AccessControlList></AccessControlList>'
|
| + # delete bucket
|
| + c.delete_bucket(bucket)
|
| + # repeat default acl tests using boto's storage_uri interface
|
| + # create a new bucket
|
| + bucket_name = 'test-%d' % int(time.time())
|
| + uri = storage_uri('gs://' + bucket_name)
|
| + uri.create_bucket()
|
| + # get default acl and make sure it's empty
|
| + acl = uri.get_def_acl()
|
| + assert acl.to_xml() == '<AccessControlList></AccessControlList>'
|
| + # set default acl to a canned acl and verify it gets set
|
| + uri.set_def_acl('public-read')
|
| + acl = uri.get_def_acl()
|
| + # save public-read acl for later test
|
| + public_read_acl = acl
|
| + assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' +
|
| + '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' +
|
| + '</Entry></Entries></AccessControlList>')
|
| + # back to private acl
|
| + uri.set_def_acl('private')
|
| + acl = uri.get_def_acl()
|
| + assert acl.to_xml() == '<AccessControlList></AccessControlList>'
|
| + # set default acl to an xml acl and verify it gets set
|
| + uri.set_def_acl(public_read_acl)
|
| + acl = uri.get_def_acl()
|
| + assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' +
|
| + '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' +
|
| + '</Entry></Entries></AccessControlList>')
|
| + # back to private acl
|
| + uri.set_def_acl('private')
|
| + acl = uri.get_def_acl()
|
| + assert acl.to_xml() == '<AccessControlList></AccessControlList>'
|
| + # delete bucket
|
| + uri.delete_bucket()
|
| +
|
| print '--- tests completed ---'
|
|
|