| OLD | NEW |
| (Empty) |
| 1 #!/usr/bin/env python | |
| 2 | |
| 3 # Copyright (c) 2006-2010 Mitch Garnaat http://garnaat.org/ | |
| 4 # Copyright (c) 2009, Eucalyptus Systems, Inc. | |
| 5 # All rights reserved. | |
| 6 # | |
| 7 # Permission is hereby granted, free of charge, to any person obtaining a | |
| 8 # copy of this software and associated documentation files (the | |
| 9 # "Software"), to deal in the Software without restriction, including | |
| 10 # without limitation the rights to use, copy, modify, merge, publish, dis- | |
| 11 # tribute, sublicense, and/or sell copies of the Software, and to permit | |
| 12 # persons to whom the Software is furnished to do so, subject to the fol- | |
| 13 # lowing conditions: | |
| 14 # | |
| 15 # The above copyright notice and this permission notice shall be included | |
| 16 # in all copies or substantial portions of the Software. | |
| 17 # | |
| 18 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
| 19 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | |
| 20 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | |
| 21 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
| 22 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
| 23 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
| 24 # IN THE SOFTWARE. | |
| 25 | |
| 26 """ | |
| 27 Some unit tests for the EC2Connection | |
| 28 """ | |
| 29 | |
| 30 import unittest | |
| 31 import time | |
| 32 from boto.ec2.connection import EC2Connection | |
| 33 import telnetlib | |
| 34 import socket | |
| 35 | |
| 36 class EC2ConnectionTest (unittest.TestCase): | |
| 37 | |
| 38 def test_1_basic(self): | |
| 39 # this is my user_id, if you want to run these tests you should | |
| 40 # replace this with yours or they won't work | |
| 41 user_id = '963068290131' | |
| 42 print '--- running EC2Connection tests ---' | |
| 43 c = EC2Connection() | |
| 44 # get list of private AMI's | |
| 45 rs = c.get_all_images(owners=[user_id]) | |
| 46 assert len(rs) > 0 | |
| 47 # now pick the first one | |
| 48 image = rs[0] | |
| 49 # temporarily make this image runnable by everyone | |
| 50 status = image.set_launch_permissions(group_names=['all']) | |
| 51 assert status | |
| 52 d = image.get_launch_permissions() | |
| 53 assert d.has_key('groups') | |
| 54 assert len(d['groups']) > 0 | |
| 55 # now remove that permission | |
| 56 status = image.remove_launch_permissions(group_names=['all']) | |
| 57 assert status | |
| 58 d = image.get_launch_permissions() | |
| 59 assert not d.has_key('groups') | |
| 60 | |
| 61 # create 2 new security groups | |
| 62 group1_name = 'test-%d' % int(time.time()) | |
| 63 group_desc = 'This is a security group created during unit testing' | |
| 64 group1 = c.create_security_group(group1_name, group_desc) | |
| 65 time.sleep(2) | |
| 66 group2_name = 'test-%d' % int(time.time()) | |
| 67 group_desc = 'This is a security group created during unit testing' | |
| 68 group2 = c.create_security_group(group2_name, group_desc) | |
| 69 # now get a listing of all security groups and look for our new one | |
| 70 rs = c.get_all_security_groups() | |
| 71 found = False | |
| 72 for g in rs: | |
| 73 if g.name == group1_name: | |
| 74 found = True | |
| 75 assert found | |
| 76 # now pass arg to filter results to only our new group | |
| 77 rs = c.get_all_security_groups([group1_name]) | |
| 78 assert len(rs) == 1 | |
| 79 # try some group to group authorizations/revocations | |
| 80 # first try the old style | |
| 81 status = c.authorize_security_group(group1.name, group2.name, group2.own
er_id) | |
| 82 assert status | |
| 83 status = c.revoke_security_group(group1.name, group2.name, group2.owner_
id) | |
| 84 assert status | |
| 85 # now try specifying a specific port | |
| 86 status = c.authorize_security_group(group1.name, group2.name, group2.own
er_id, | |
| 87 'tcp', 22, 22) | |
| 88 assert status | |
| 89 status = c.revoke_security_group(group1.name, group2.name, group2.owner_
id, | |
| 90 'tcp', 22, 22) | |
| 91 assert status | |
| 92 | |
| 93 # now delete the second security group | |
| 94 status = c.delete_security_group(group2_name) | |
| 95 # now make sure it's really gone | |
| 96 rs = c.get_all_security_groups() | |
| 97 found = False | |
| 98 for g in rs: | |
| 99 if g.name == group2_name: | |
| 100 found = True | |
| 101 assert not found | |
| 102 | |
| 103 group = group1 | |
| 104 | |
| 105 # now try to launch apache image with our new security group | |
| 106 rs = c.get_all_images() | |
| 107 img_loc = 'ec2-public-images/fedora-core4-apache.manifest.xml' | |
| 108 for image in rs: | |
| 109 if image.location == img_loc: | |
| 110 break | |
| 111 reservation = image.run(security_groups=[group.name]) | |
| 112 instance = reservation.instances[0] | |
| 113 while instance.state != 'running': | |
| 114 print '\tinstance is %s' % instance.state | |
| 115 time.sleep(30) | |
| 116 instance.update() | |
| 117 # instance in now running, try to telnet to port 80 | |
| 118 t = telnetlib.Telnet() | |
| 119 try: | |
| 120 t.open(instance.dns_name, 80) | |
| 121 except socket.error: | |
| 122 pass | |
| 123 # now open up port 80 and try again, it should work | |
| 124 group.authorize('tcp', 80, 80, '0.0.0.0/0') | |
| 125 t.open(instance.dns_name, 80) | |
| 126 t.close() | |
| 127 # now revoke authorization and try again | |
| 128 group.revoke('tcp', 80, 80, '0.0.0.0/0') | |
| 129 try: | |
| 130 t.open(instance.dns_name, 80) | |
| 131 except socket.error: | |
| 132 pass | |
| 133 # now kill the instance and delete the security group | |
| 134 instance.terminate() | |
| 135 # unfortunately, I can't delete the sg within this script | |
| 136 #sg.delete() | |
| 137 | |
| 138 # create a new key pair | |
| 139 key_name = 'test-%d' % int(time.time()) | |
| 140 status = c.create_key_pair(key_name) | |
| 141 assert status | |
| 142 # now get a listing of all key pairs and look for our new one | |
| 143 rs = c.get_all_key_pairs() | |
| 144 found = False | |
| 145 for k in rs: | |
| 146 if k.name == key_name: | |
| 147 found = True | |
| 148 assert found | |
| 149 # now pass arg to filter results to only our new key pair | |
| 150 rs = c.get_all_key_pairs([key_name]) | |
| 151 assert len(rs) == 1 | |
| 152 key_pair = rs[0] | |
| 153 # now delete the key pair | |
| 154 status = c.delete_key_pair(key_name) | |
| 155 # now make sure it's really gone | |
| 156 rs = c.get_all_key_pairs() | |
| 157 found = False | |
| 158 for k in rs: | |
| 159 if k.name == key_name: | |
| 160 found = True | |
| 161 assert not found | |
| 162 | |
| 163 # short test around Paid AMI capability | |
| 164 demo_paid_ami_id = 'ami-bd9d78d4' | |
| 165 demo_paid_ami_product_code = 'A79EC0DB' | |
| 166 l = c.get_all_images([demo_paid_ami_id]) | |
| 167 assert len(l) == 1 | |
| 168 assert len(l[0].product_codes) == 1 | |
| 169 assert l[0].product_codes[0] == demo_paid_ami_product_code | |
| 170 | |
| 171 print '--- tests completed ---' | |
| OLD | NEW |