Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(242)

Side by Side Diff: tests/s3/test_gsconnection.py

Issue 8669001: Pull in upstream boto from github at bcb719937de9ac2851e632d62b777352029a6d55 (Closed) Base URL: svn://svn.chromium.org/boto
Patch Set: Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « tests/s3/mock_storage_service.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # -*- coding: utf-8 -*- 1 # -*- coding: utf-8 -*-
2 # Copyright (c) 2006-2011 Mitch Garnaat http://garnaat.org/ 2 # Copyright (c) 2006-2011 Mitch Garnaat http://garnaat.org/
3 # Copyright (c) 2010, Eucalyptus Systems, Inc. 3 # Copyright (c) 2010, Eucalyptus Systems, Inc.
4 # Copyright (c) 2011, Nexenta Systems, Inc. 4 # Copyright (c) 2011, Nexenta Systems, Inc.
5 # All rights reserved. 5 # All rights reserved.
6 # 6 #
7 # Permission is hereby granted, free of charge, to any person obtaining a 7 # Permission is hereby granted, free of charge, to any person obtaining a
8 # copy of this software and associated documentation files (the 8 # copy of this software and associated documentation files (the
9 # "Software"), to deal in the Software without restriction, including 9 # "Software"), to deal in the Software without restriction, including
10 # without limitation the rights to use, copy, modify, merge, publish, dis- 10 # without limitation the rights to use, copy, modify, merge, publish, dis-
(...skipping 13 matching lines...) Expand all
24 # IN THE SOFTWARE. 24 # IN THE SOFTWARE.
25 25
26 """ 26 """
27 Some unit tests for the GSConnection 27 Some unit tests for the GSConnection
28 """ 28 """
29 29
30 import unittest 30 import unittest
31 import time 31 import time
32 import os 32 import os
33 from boto.gs.connection import GSConnection 33 from boto.gs.connection import GSConnection
34 from boto import storage_uri
34 35
35 class GSConnectionTest (unittest.TestCase): 36 class GSConnectionTest (unittest.TestCase):
36 37
37 def test_1_basic(self): 38 def test_1_basic(self):
39 """basic regression test for Google Cloud Storage"""
38 print '--- running GSConnection tests ---' 40 print '--- running GSConnection tests ---'
39 c = GSConnection() 41 c = GSConnection()
40 # create a new, empty bucket 42 # create a new, empty bucket
41 bucket_name = 'test-%d' % int(time.time()) 43 bucket_name = 'test-%d' % int(time.time())
42 bucket = c.create_bucket(bucket_name) 44 bucket = c.create_bucket(bucket_name)
43 # now try a get_bucket call and see if it's really there 45 # now try a get_bucket call and see if it's really there
44 bucket = c.get_bucket(bucket_name) 46 bucket = c.get_bucket(bucket_name)
45 k = bucket.new_key() 47 k = bucket.new_key()
46 k.name = 'foobar' 48 k.name = 'foobar'
47 s1 = 'This is a test of file upload and download' 49 s1 = 'This is a test of file upload and download'
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
169 assert bucket.get_subresource('logging') == empty_logging_str 171 assert bucket.get_subresource('logging') == empty_logging_str
170 bucket.enable_logging('log-bucket', 'example', 172 bucket.enable_logging('log-bucket', 'example',
171 canned_acl='bucket-owner-full-control') 173 canned_acl='bucket-owner-full-control')
172 assert bucket.get_subresource('logging') == logging_str; 174 assert bucket.get_subresource('logging') == logging_str;
173 # now delete all keys in bucket 175 # now delete all keys in bucket
174 for k in bucket: 176 for k in bucket:
175 bucket.delete_key(k) 177 bucket.delete_key(k)
176 # now delete bucket 178 # now delete bucket
177 time.sleep(5) 179 time.sleep(5)
178 c.delete_bucket(bucket) 180 c.delete_bucket(bucket)
181
182 def test_2_copy_key(self):
183 """test copying a key from one bucket to another"""
184 c = GSConnection()
185 # create two new, empty buckets
186 bucket_name_1 = 'test1-%d' % int(time.time())
187 bucket_name_2 = 'test2-%d' % int(time.time())
188 bucket1 = c.create_bucket(bucket_name_1)
189 bucket2 = c.create_bucket(bucket_name_2)
190 # verify buckets got created
191 bucket1 = c.get_bucket(bucket_name_1)
192 bucket2 = c.get_bucket(bucket_name_2)
193 # create a key in bucket1 and give it some content
194 k1 = bucket1.new_key()
195 assert isinstance(k1, bucket1.key_class)
196 key_name = 'foobar'
197 k1.name = key_name
198 s = 'This is a test.'
199 k1.set_contents_from_string(s)
200 # copy the new key from bucket1 to bucket2
201 k1.copy(bucket_name_2, key_name)
202 # now copy the contents from bucket2 to a local file
203 k2 = bucket2.lookup(key_name)
204 assert isinstance(k2, bucket2.key_class)
205 fp = open('foobar', 'wb')
206 k2.get_contents_to_file(fp)
207 fp.close()
208 fp = open('foobar')
209 # check to make sure content read is identical to original
210 assert s == fp.read(), 'move test failed!'
211 fp.close()
212 # delete keys
213 bucket1.delete_key(k1)
214 bucket2.delete_key(k2)
215 # delete test buckets
216 c.delete_bucket(bucket1)
217 c.delete_bucket(bucket2)
218
219 def test_3_default_object_acls(self):
220 """test default object acls"""
221 c = GSConnection()
222 # create a new bucket
223 bucket_name = 'test-%d' % int(time.time())
224 bucket = c.create_bucket(bucket_name)
225 # now call get_bucket to see if it's really there
226 bucket = c.get_bucket(bucket_name)
227 # get default acl and make sure it's empty
228 acl = bucket.get_def_acl()
229 assert acl.to_xml() == '<AccessControlList></AccessControlList>'
230 # set default acl to a canned acl and verify it gets set
231 bucket.set_def_acl('public-read')
232 acl = bucket.get_def_acl()
233 # save public-read acl for later test
234 public_read_acl = acl
235 assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' +
236 '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' +
237 '</Entry></Entries></AccessControlList>')
238 # back to private acl
239 bucket.set_def_acl('private')
240 acl = bucket.get_def_acl()
241 assert acl.to_xml() == '<AccessControlList></AccessControlList>'
242 # set default acl to an xml acl and verify it gets set
243 bucket.set_def_acl(public_read_acl)
244 acl = bucket.get_def_acl()
245 assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' +
246 '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' +
247 '</Entry></Entries></AccessControlList>')
248 # back to private acl
249 bucket.set_def_acl('private')
250 acl = bucket.get_def_acl()
251 assert acl.to_xml() == '<AccessControlList></AccessControlList>'
252 # delete bucket
253 c.delete_bucket(bucket)
254 # repeat default acl tests using boto's storage_uri interface
255 # create a new bucket
256 bucket_name = 'test-%d' % int(time.time())
257 uri = storage_uri('gs://' + bucket_name)
258 uri.create_bucket()
259 # get default acl and make sure it's empty
260 acl = uri.get_def_acl()
261 assert acl.to_xml() == '<AccessControlList></AccessControlList>'
262 # set default acl to a canned acl and verify it gets set
263 uri.set_def_acl('public-read')
264 acl = uri.get_def_acl()
265 # save public-read acl for later test
266 public_read_acl = acl
267 assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' +
268 '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' +
269 '</Entry></Entries></AccessControlList>')
270 # back to private acl
271 uri.set_def_acl('private')
272 acl = uri.get_def_acl()
273 assert acl.to_xml() == '<AccessControlList></AccessControlList>'
274 # set default acl to an xml acl and verify it gets set
275 uri.set_def_acl(public_read_acl)
276 acl = uri.get_def_acl()
277 assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' +
278 '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' +
279 '</Entry></Entries></AccessControlList>')
280 # back to private acl
281 uri.set_def_acl('private')
282 acl = uri.get_def_acl()
283 assert acl.to_xml() == '<AccessControlList></AccessControlList>'
284 # delete bucket
285 uri.delete_bucket()
286
179 print '--- tests completed ---' 287 print '--- tests completed ---'
OLDNEW
« no previous file with comments | « tests/s3/mock_storage_service.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698