| OLD | NEW |
| (Empty) | |
| 1 #!/usr/bin/env python |
| 2 |
| 3 from tests.unit import unittest |
| 4 from tests.unit import AWSMockServiceTestCase |
| 5 |
| 6 from boto.ec2.connection import EC2Connection |
| 7 from boto.ec2.ec2object import TaggedEC2Object |
| 8 |
| 9 |
| 10 CREATE_TAGS_RESPONSE = r"""<?xml version="1.0" encoding="UTF-8"?> |
| 11 <CreateTagsResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/"> |
| 12 <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId> |
| 13 <return>true</return> |
| 14 </CreateTagsResponse> |
| 15 """ |
| 16 |
| 17 |
| 18 DELETE_TAGS_RESPONSE = r"""<?xml version="1.0" encoding="UTF-8"?> |
| 19 <DeleteTagsResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/"> |
| 20 <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId> |
| 21 <return>true</return> |
| 22 </DeleteTagsResponse> |
| 23 """ |
| 24 |
| 25 |
| 26 class TestAddTags(AWSMockServiceTestCase): |
| 27 connection_class = EC2Connection |
| 28 |
| 29 def default_body(self): |
| 30 return CREATE_TAGS_RESPONSE |
| 31 |
| 32 def test_add_tag(self): |
| 33 self.set_http_response(status_code=200) |
| 34 taggedEC2Object = TaggedEC2Object(self.service_connection) |
| 35 taggedEC2Object.id = "i-abcd1234" |
| 36 taggedEC2Object.tags["already_present_key"] = "already_present_value" |
| 37 |
| 38 taggedEC2Object.add_tag("new_key", "new_value") |
| 39 |
| 40 self.assert_request_parameters({ |
| 41 'ResourceId.1': 'i-abcd1234', |
| 42 'Action': 'CreateTags', |
| 43 'Tag.1.Key': 'new_key', |
| 44 'Tag.1.Value': 'new_value'}, |
| 45 ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', |
| 46 'SignatureVersion', 'Timestamp', |
| 47 'Version']) |
| 48 |
| 49 self.assertEqual(taggedEC2Object.tags, { |
| 50 "already_present_key":"already_present_value", |
| 51 "new_key":"new_value"}) |
| 52 |
| 53 def test_add_tags(self): |
| 54 self.set_http_response(status_code=200) |
| 55 taggedEC2Object = TaggedEC2Object(self.service_connection) |
| 56 taggedEC2Object.id = "i-abcd1234" |
| 57 taggedEC2Object.tags["already_present_key"] = "already_present_value" |
| 58 |
| 59 taggedEC2Object.add_tags({"key1":"value1", "key2":"value2"}) |
| 60 |
| 61 self.assert_request_parameters({ |
| 62 'ResourceId.1': 'i-abcd1234', |
| 63 'Action': 'CreateTags', |
| 64 'Tag.1.Key': 'key1', |
| 65 'Tag.1.Value': 'value1', |
| 66 'Tag.2.Key': 'key2', |
| 67 'Tag.2.Value': 'value2'}, |
| 68 ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', |
| 69 'SignatureVersion', 'Timestamp', |
| 70 'Version']) |
| 71 |
| 72 self.assertEqual(taggedEC2Object.tags, { |
| 73 "already_present_key":"already_present_value", |
| 74 "key1":"value1", |
| 75 "key2": "value2"}) |
| 76 |
| 77 |
| 78 class TestRemoveTags(AWSMockServiceTestCase): |
| 79 connection_class = EC2Connection |
| 80 |
| 81 def default_body(self): |
| 82 return DELETE_TAGS_RESPONSE |
| 83 |
| 84 def test_remove_tag(self): |
| 85 self.set_http_response(status_code=200) |
| 86 taggedEC2Object = TaggedEC2Object(self.service_connection) |
| 87 taggedEC2Object.id = "i-abcd1234" |
| 88 taggedEC2Object.tags["key1"] = "value1" |
| 89 taggedEC2Object.tags["key2"] = "value2" |
| 90 |
| 91 taggedEC2Object.remove_tag("key1", "value1") |
| 92 |
| 93 self.assert_request_parameters({ |
| 94 'ResourceId.1': 'i-abcd1234', |
| 95 'Action': 'DeleteTags', |
| 96 'Tag.1.Key': 'key1', |
| 97 'Tag.1.Value': 'value1'}, |
| 98 ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', |
| 99 'SignatureVersion', 'Timestamp', |
| 100 'Version']) |
| 101 |
| 102 self.assertEqual(taggedEC2Object.tags, {"key2":"value2"}) |
| 103 |
| 104 def test_remove_tag_no_value(self): |
| 105 self.set_http_response(status_code=200) |
| 106 taggedEC2Object = TaggedEC2Object(self.service_connection) |
| 107 taggedEC2Object.id = "i-abcd1234" |
| 108 taggedEC2Object.tags["key1"] = "value1" |
| 109 taggedEC2Object.tags["key2"] = "value2" |
| 110 |
| 111 taggedEC2Object.remove_tag("key1") |
| 112 |
| 113 self.assert_request_parameters({ |
| 114 'ResourceId.1': 'i-abcd1234', |
| 115 'Action': 'DeleteTags', |
| 116 'Tag.1.Key': 'key1'}, |
| 117 ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', |
| 118 'SignatureVersion', 'Timestamp', |
| 119 'Version']) |
| 120 |
| 121 self.assertEqual(taggedEC2Object.tags, {"key2":"value2"}) |
| 122 |
| 123 def test_remove_tag_empty_value(self): |
| 124 self.set_http_response(status_code=200) |
| 125 taggedEC2Object = TaggedEC2Object(self.service_connection) |
| 126 taggedEC2Object.id = "i-abcd1234" |
| 127 taggedEC2Object.tags["key1"] = "value1" |
| 128 taggedEC2Object.tags["key2"] = "value2" |
| 129 |
| 130 taggedEC2Object.remove_tag("key1", "") |
| 131 |
| 132 self.assert_request_parameters({ |
| 133 'ResourceId.1': 'i-abcd1234', |
| 134 'Action': 'DeleteTags', |
| 135 'Tag.1.Key': 'key1', |
| 136 'Tag.1.Value': ''}, |
| 137 ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', |
| 138 'SignatureVersion', 'Timestamp', |
| 139 'Version']) |
| 140 |
| 141 self.assertEqual(taggedEC2Object.tags, {"key2":"value2"}) |
| 142 |
| 143 |
| 144 if __name__ == '__main__': |
| 145 unittest.main() |
| OLD | NEW |