Index: tests/s3/test_gsconnection.py |
diff --git a/tests/s3/test_gsconnection.py b/tests/s3/test_gsconnection.py |
index e38f442165eee500053db1ec1bf3233b1e374bd4..c8c6d585fee9588a31a736f127ff6837735bc46a 100644 |
--- a/tests/s3/test_gsconnection.py |
+++ b/tests/s3/test_gsconnection.py |
@@ -31,10 +31,12 @@ import unittest |
import time |
import os |
from boto.gs.connection import GSConnection |
+from boto import storage_uri |
class GSConnectionTest (unittest.TestCase): |
def test_1_basic(self): |
+ """basic regression test for Google Cloud Storage""" |
print '--- running GSConnection tests ---' |
c = GSConnection() |
# create a new, empty bucket |
@@ -176,4 +178,110 @@ class GSConnectionTest (unittest.TestCase): |
# now delete bucket |
time.sleep(5) |
c.delete_bucket(bucket) |
+ |
+ def test_2_copy_key(self): |
+ """test copying a key from one bucket to another""" |
+ c = GSConnection() |
+ # create two new, empty buckets |
+ bucket_name_1 = 'test1-%d' % int(time.time()) |
+ bucket_name_2 = 'test2-%d' % int(time.time()) |
+ bucket1 = c.create_bucket(bucket_name_1) |
+ bucket2 = c.create_bucket(bucket_name_2) |
+ # verify buckets got created |
+ bucket1 = c.get_bucket(bucket_name_1) |
+ bucket2 = c.get_bucket(bucket_name_2) |
+ # create a key in bucket1 and give it some content |
+ k1 = bucket1.new_key() |
+ assert isinstance(k1, bucket1.key_class) |
+ key_name = 'foobar' |
+ k1.name = key_name |
+ s = 'This is a test.' |
+ k1.set_contents_from_string(s) |
+ # copy the new key from bucket1 to bucket2 |
+ k1.copy(bucket_name_2, key_name) |
+ # now copy the contents from bucket2 to a local file |
+ k2 = bucket2.lookup(key_name) |
+ assert isinstance(k2, bucket2.key_class) |
+ fp = open('foobar', 'wb') |
+ k2.get_contents_to_file(fp) |
+ fp.close() |
+ fp = open('foobar') |
+ # check to make sure content read is identical to original |
+ assert s == fp.read(), 'move test failed!' |
+ fp.close() |
+ # delete keys |
+ bucket1.delete_key(k1) |
+ bucket2.delete_key(k2) |
+ # delete test buckets |
+ c.delete_bucket(bucket1) |
+ c.delete_bucket(bucket2) |
+ |
+ def test_3_default_object_acls(self): |
+ """test default object acls""" |
+ c = GSConnection() |
+ # create a new bucket |
+ bucket_name = 'test-%d' % int(time.time()) |
+ bucket = c.create_bucket(bucket_name) |
+ # now call get_bucket to see if it's really there |
+ bucket = c.get_bucket(bucket_name) |
+ # get default acl and make sure it's empty |
+ acl = bucket.get_def_acl() |
+ assert acl.to_xml() == '<AccessControlList></AccessControlList>' |
+ # set default acl to a canned acl and verify it gets set |
+ bucket.set_def_acl('public-read') |
+ acl = bucket.get_def_acl() |
+ # save public-read acl for later test |
+ public_read_acl = acl |
+ assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' + |
+ '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' + |
+ '</Entry></Entries></AccessControlList>') |
+ # back to private acl |
+ bucket.set_def_acl('private') |
+ acl = bucket.get_def_acl() |
+ assert acl.to_xml() == '<AccessControlList></AccessControlList>' |
+ # set default acl to an xml acl and verify it gets set |
+ bucket.set_def_acl(public_read_acl) |
+ acl = bucket.get_def_acl() |
+ assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' + |
+ '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' + |
+ '</Entry></Entries></AccessControlList>') |
+ # back to private acl |
+ bucket.set_def_acl('private') |
+ acl = bucket.get_def_acl() |
+ assert acl.to_xml() == '<AccessControlList></AccessControlList>' |
+ # delete bucket |
+ c.delete_bucket(bucket) |
+ # repeat default acl tests using boto's storage_uri interface |
+ # create a new bucket |
+ bucket_name = 'test-%d' % int(time.time()) |
+ uri = storage_uri('gs://' + bucket_name) |
+ uri.create_bucket() |
+ # get default acl and make sure it's empty |
+ acl = uri.get_def_acl() |
+ assert acl.to_xml() == '<AccessControlList></AccessControlList>' |
+ # set default acl to a canned acl and verify it gets set |
+ uri.set_def_acl('public-read') |
+ acl = uri.get_def_acl() |
+ # save public-read acl for later test |
+ public_read_acl = acl |
+ assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' + |
+ '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' + |
+ '</Entry></Entries></AccessControlList>') |
+ # back to private acl |
+ uri.set_def_acl('private') |
+ acl = uri.get_def_acl() |
+ assert acl.to_xml() == '<AccessControlList></AccessControlList>' |
+ # set default acl to an xml acl and verify it gets set |
+ uri.set_def_acl(public_read_acl) |
+ acl = uri.get_def_acl() |
+ assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' + |
+ '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' + |
+ '</Entry></Entries></AccessControlList>') |
+ # back to private acl |
+ uri.set_def_acl('private') |
+ acl = uri.get_def_acl() |
+ assert acl.to_xml() == '<AccessControlList></AccessControlList>' |
+ # delete bucket |
+ uri.delete_bucket() |
+ |
print '--- tests completed ---' |