OLD | NEW |
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 # Copyright (c) 2006-2011 Mitch Garnaat http://garnaat.org/ | 2 # Copyright (c) 2006-2011 Mitch Garnaat http://garnaat.org/ |
3 # Copyright (c) 2010, Eucalyptus Systems, Inc. | 3 # Copyright (c) 2010, Eucalyptus Systems, Inc. |
4 # Copyright (c) 2011, Nexenta Systems, Inc. | 4 # Copyright (c) 2011, Nexenta Systems, Inc. |
5 # All rights reserved. | 5 # All rights reserved. |
6 # | 6 # |
7 # Permission is hereby granted, free of charge, to any person obtaining a | 7 # Permission is hereby granted, free of charge, to any person obtaining a |
8 # copy of this software and associated documentation files (the | 8 # copy of this software and associated documentation files (the |
9 # "Software"), to deal in the Software without restriction, including | 9 # "Software"), to deal in the Software without restriction, including |
10 # without limitation the rights to use, copy, modify, merge, publish, dis- | 10 # without limitation the rights to use, copy, modify, merge, publish, dis- |
(...skipping 13 matching lines...) Expand all Loading... |
24 # IN THE SOFTWARE. | 24 # IN THE SOFTWARE. |
25 | 25 |
26 """ | 26 """ |
27 Some unit tests for the GSConnection | 27 Some unit tests for the GSConnection |
28 """ | 28 """ |
29 | 29 |
30 import unittest | 30 import unittest |
31 import time | 31 import time |
32 import os | 32 import os |
33 from boto.gs.connection import GSConnection | 33 from boto.gs.connection import GSConnection |
| 34 from boto import storage_uri |
34 | 35 |
35 class GSConnectionTest (unittest.TestCase): | 36 class GSConnectionTest (unittest.TestCase): |
36 | 37 |
37 def test_1_basic(self): | 38 def test_1_basic(self): |
| 39 """basic regression test for Google Cloud Storage""" |
38 print '--- running GSConnection tests ---' | 40 print '--- running GSConnection tests ---' |
39 c = GSConnection() | 41 c = GSConnection() |
40 # create a new, empty bucket | 42 # create a new, empty bucket |
41 bucket_name = 'test-%d' % int(time.time()) | 43 bucket_name = 'test-%d' % int(time.time()) |
42 bucket = c.create_bucket(bucket_name) | 44 bucket = c.create_bucket(bucket_name) |
43 # now try a get_bucket call and see if it's really there | 45 # now try a get_bucket call and see if it's really there |
44 bucket = c.get_bucket(bucket_name) | 46 bucket = c.get_bucket(bucket_name) |
45 k = bucket.new_key() | 47 k = bucket.new_key() |
46 k.name = 'foobar' | 48 k.name = 'foobar' |
47 s1 = 'This is a test of file upload and download' | 49 s1 = 'This is a test of file upload and download' |
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
169 assert bucket.get_subresource('logging') == empty_logging_str | 171 assert bucket.get_subresource('logging') == empty_logging_str |
170 bucket.enable_logging('log-bucket', 'example', | 172 bucket.enable_logging('log-bucket', 'example', |
171 canned_acl='bucket-owner-full-control') | 173 canned_acl='bucket-owner-full-control') |
172 assert bucket.get_subresource('logging') == logging_str; | 174 assert bucket.get_subresource('logging') == logging_str; |
173 # now delete all keys in bucket | 175 # now delete all keys in bucket |
174 for k in bucket: | 176 for k in bucket: |
175 bucket.delete_key(k) | 177 bucket.delete_key(k) |
176 # now delete bucket | 178 # now delete bucket |
177 time.sleep(5) | 179 time.sleep(5) |
178 c.delete_bucket(bucket) | 180 c.delete_bucket(bucket) |
| 181 |
| 182 def test_2_copy_key(self): |
| 183 """test copying a key from one bucket to another""" |
| 184 c = GSConnection() |
| 185 # create two new, empty buckets |
| 186 bucket_name_1 = 'test1-%d' % int(time.time()) |
| 187 bucket_name_2 = 'test2-%d' % int(time.time()) |
| 188 bucket1 = c.create_bucket(bucket_name_1) |
| 189 bucket2 = c.create_bucket(bucket_name_2) |
| 190 # verify buckets got created |
| 191 bucket1 = c.get_bucket(bucket_name_1) |
| 192 bucket2 = c.get_bucket(bucket_name_2) |
| 193 # create a key in bucket1 and give it some content |
| 194 k1 = bucket1.new_key() |
| 195 assert isinstance(k1, bucket1.key_class) |
| 196 key_name = 'foobar' |
| 197 k1.name = key_name |
| 198 s = 'This is a test.' |
| 199 k1.set_contents_from_string(s) |
| 200 # copy the new key from bucket1 to bucket2 |
| 201 k1.copy(bucket_name_2, key_name) |
| 202 # now copy the contents from bucket2 to a local file |
| 203 k2 = bucket2.lookup(key_name) |
| 204 assert isinstance(k2, bucket2.key_class) |
| 205 fp = open('foobar', 'wb') |
| 206 k2.get_contents_to_file(fp) |
| 207 fp.close() |
| 208 fp = open('foobar') |
| 209 # check to make sure content read is identical to original |
| 210 assert s == fp.read(), 'move test failed!' |
| 211 fp.close() |
| 212 # delete keys |
| 213 bucket1.delete_key(k1) |
| 214 bucket2.delete_key(k2) |
| 215 # delete test buckets |
| 216 c.delete_bucket(bucket1) |
| 217 c.delete_bucket(bucket2) |
| 218 |
| 219 def test_3_default_object_acls(self): |
| 220 """test default object acls""" |
| 221 c = GSConnection() |
| 222 # create a new bucket |
| 223 bucket_name = 'test-%d' % int(time.time()) |
| 224 bucket = c.create_bucket(bucket_name) |
| 225 # now call get_bucket to see if it's really there |
| 226 bucket = c.get_bucket(bucket_name) |
| 227 # get default acl and make sure it's empty |
| 228 acl = bucket.get_def_acl() |
| 229 assert acl.to_xml() == '<AccessControlList></AccessControlList>' |
| 230 # set default acl to a canned acl and verify it gets set |
| 231 bucket.set_def_acl('public-read') |
| 232 acl = bucket.get_def_acl() |
| 233 # save public-read acl for later test |
| 234 public_read_acl = acl |
| 235 assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' + |
| 236 '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' + |
| 237 '</Entry></Entries></AccessControlList>') |
| 238 # back to private acl |
| 239 bucket.set_def_acl('private') |
| 240 acl = bucket.get_def_acl() |
| 241 assert acl.to_xml() == '<AccessControlList></AccessControlList>' |
| 242 # set default acl to an xml acl and verify it gets set |
| 243 bucket.set_def_acl(public_read_acl) |
| 244 acl = bucket.get_def_acl() |
| 245 assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' + |
| 246 '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' + |
| 247 '</Entry></Entries></AccessControlList>') |
| 248 # back to private acl |
| 249 bucket.set_def_acl('private') |
| 250 acl = bucket.get_def_acl() |
| 251 assert acl.to_xml() == '<AccessControlList></AccessControlList>' |
| 252 # delete bucket |
| 253 c.delete_bucket(bucket) |
| 254 # repeat default acl tests using boto's storage_uri interface |
| 255 # create a new bucket |
| 256 bucket_name = 'test-%d' % int(time.time()) |
| 257 uri = storage_uri('gs://' + bucket_name) |
| 258 uri.create_bucket() |
| 259 # get default acl and make sure it's empty |
| 260 acl = uri.get_def_acl() |
| 261 assert acl.to_xml() == '<AccessControlList></AccessControlList>' |
| 262 # set default acl to a canned acl and verify it gets set |
| 263 uri.set_def_acl('public-read') |
| 264 acl = uri.get_def_acl() |
| 265 # save public-read acl for later test |
| 266 public_read_acl = acl |
| 267 assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' + |
| 268 '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' + |
| 269 '</Entry></Entries></AccessControlList>') |
| 270 # back to private acl |
| 271 uri.set_def_acl('private') |
| 272 acl = uri.get_def_acl() |
| 273 assert acl.to_xml() == '<AccessControlList></AccessControlList>' |
| 274 # set default acl to an xml acl and verify it gets set |
| 275 uri.set_def_acl(public_read_acl) |
| 276 acl = uri.get_def_acl() |
| 277 assert acl.to_xml() == ('<AccessControlList><Entries><Entry>' + |
| 278 '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' + |
| 279 '</Entry></Entries></AccessControlList>') |
| 280 # back to private acl |
| 281 uri.set_def_acl('private') |
| 282 acl = uri.get_def_acl() |
| 283 assert acl.to_xml() == '<AccessControlList></AccessControlList>' |
| 284 # delete bucket |
| 285 uri.delete_bucket() |
| 286 |
179 print '--- tests completed ---' | 287 print '--- tests completed ---' |
OLD | NEW |