Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(382)

Unified Diff: tests/s3/test_gsconnection.py

Issue 8669001: Pull in upstream boto from github at bcb719937de9ac2851e632d62b777352029a6d55 (Closed) Base URL: svn://svn.chromium.org/boto
Patch Set: Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « tests/s3/mock_storage_service.py ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: tests/s3/test_gsconnection.py
diff --git a/tests/s3/test_gsconnection.py b/tests/s3/test_gsconnection.py
index e38f442165eee500053db1ec1bf3233b1e374bd4..c8c6d585fee9588a31a736f127ff6837735bc46a 100644
--- a/tests/s3/test_gsconnection.py
+++ b/tests/s3/test_gsconnection.py
@@ -31,10 +31,12 @@ import unittest
import time
import os
from boto.gs.connection import GSConnection
+from boto import storage_uri
class GSConnectionTest (unittest.TestCase):
def test_1_basic(self):
+ """basic regression test for Google Cloud Storage"""
print '--- running GSConnection tests ---'
c = GSConnection()
# create a new, empty bucket
@@ -176,4 +178,110 @@ class GSConnectionTest (unittest.TestCase):
# now delete bucket
time.sleep(5)
c.delete_bucket(bucket)
+
+ def test_2_copy_key(self):
+ """test copying a key from one bucket to another"""
+ c = GSConnection()
+ # create two new, empty buckets
+ bucket_name_1 = 'test1-%d' % int(time.time())
+ bucket_name_2 = 'test2-%d' % int(time.time())
+ bucket1 = c.create_bucket(bucket_name_1)
+ bucket2 = c.create_bucket(bucket_name_2)
+ # verify buckets got created
+ bucket1 = c.get_bucket(bucket_name_1)
+ bucket2 = c.get_bucket(bucket_name_2)
+ # create a key in bucket1 and give it some content
+ k1 = bucket1.new_key()
+ assert isinstance(k1, bucket1.key_class)
+ key_name = 'foobar'
+ k1.name = key_name
+ s = 'This is a test.'
+ k1.set_contents_from_string(s)
+ # copy the new key from bucket1 to bucket2
+ k1.copy(bucket_name_2, key_name)
+ # now copy the contents from bucket2 to a local file
+ k2 = bucket2.lookup(key_name)
+ assert isinstance(k2, bucket2.key_class)
+ fp = open('foobar', 'wb')
+ k2.get_contents_to_file(fp)
+ fp.close()
+ fp = open('foobar')
+ # check to make sure content read is identical to original
+ assert s == fp.read(), 'move test failed!'
+ fp.close()
+ # delete keys
+ bucket1.delete_key(k1)
+ bucket2.delete_key(k2)
+ # delete test buckets
+ c.delete_bucket(bucket1)
+ c.delete_bucket(bucket2)
+
+ def test_3_default_object_acls(self):
+ """test default object acls"""
+ c = GSConnection()
+ # create a new bucket
+ bucket_name = 'test-%d' % int(time.time())
+ bucket = c.create_bucket(bucket_name)
+ # now call get_bucket to see if it's really there
+ bucket = c.get_bucket(bucket_name)
+ # get default acl and make sure it's empty
+ acl = bucket.get_def_acl()
+ assert acl.to_xml() == '<AccessControlList></AccessControlList>'
+ # set default acl to a canned acl and verify it gets set
+ bucket.set_def_acl('public-read')
+ acl = bucket.get_def_acl()
+ # save public-read acl for later test
+ public_read_acl = acl
+ assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' +
+ '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' +
+ '</Entry></Entries></AccessControlList>')
+ # back to private acl
+ bucket.set_def_acl('private')
+ acl = bucket.get_def_acl()
+ assert acl.to_xml() == '<AccessControlList></AccessControlList>'
+ # set default acl to an xml acl and verify it gets set
+ bucket.set_def_acl(public_read_acl)
+ acl = bucket.get_def_acl()
+ assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' +
+ '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' +
+ '</Entry></Entries></AccessControlList>')
+ # back to private acl
+ bucket.set_def_acl('private')
+ acl = bucket.get_def_acl()
+ assert acl.to_xml() == '<AccessControlList></AccessControlList>'
+ # delete bucket
+ c.delete_bucket(bucket)
+ # repeat default acl tests using boto's storage_uri interface
+ # create a new bucket
+ bucket_name = 'test-%d' % int(time.time())
+ uri = storage_uri('gs://' + bucket_name)
+ uri.create_bucket()
+ # get default acl and make sure it's empty
+ acl = uri.get_def_acl()
+ assert acl.to_xml() == '<AccessControlList></AccessControlList>'
+ # set default acl to a canned acl and verify it gets set
+ uri.set_def_acl('public-read')
+ acl = uri.get_def_acl()
+ # save public-read acl for later test
+ public_read_acl = acl
+ assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' +
+ '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' +
+ '</Entry></Entries></AccessControlList>')
+ # back to private acl
+ uri.set_def_acl('private')
+ acl = uri.get_def_acl()
+ assert acl.to_xml() == '<AccessControlList></AccessControlList>'
+ # set default acl to an xml acl and verify it gets set
+ uri.set_def_acl(public_read_acl)
+ acl = uri.get_def_acl()
+ assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' +
+ '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' +
+ '</Entry></Entries></AccessControlList>')
+ # back to private acl
+ uri.set_def_acl('private')
+ acl = uri.get_def_acl()
+ assert acl.to_xml() == '<AccessControlList></AccessControlList>'
+ # delete bucket
+ uri.delete_bucket()
+
print '--- tests completed ---'
« no previous file with comments | « tests/s3/mock_storage_service.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698