Index: third_party/gsutil/third_party/boto/tests/unit/ec2/test_ec2object.py |
diff --git a/third_party/gsutil/third_party/boto/tests/unit/ec2/test_ec2object.py b/third_party/gsutil/third_party/boto/tests/unit/ec2/test_ec2object.py |
new file mode 100755 |
index 0000000000000000000000000000000000000000..14841e91b64070107336bf171b74357429520eb0 |
--- /dev/null |
+++ b/third_party/gsutil/third_party/boto/tests/unit/ec2/test_ec2object.py |
@@ -0,0 +1,211 @@ |
+#!/usr/bin/env python |
+ |
+from tests.unit import unittest |
+from tests.unit import AWSMockServiceTestCase |
+ |
+from boto.ec2.connection import EC2Connection |
+from boto.ec2.ec2object import TaggedEC2Object |
+ |
+ |
+CREATE_TAGS_RESPONSE = br"""<?xml version="1.0" encoding="UTF-8"?> |
+<CreateTagsResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/"> |
+ <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId> |
+ <return>true</return> |
+</CreateTagsResponse> |
+""" |
+ |
+ |
+DELETE_TAGS_RESPONSE = br"""<?xml version="1.0" encoding="UTF-8"?> |
+<DeleteTagsResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/"> |
+ <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId> |
+ <return>true</return> |
+</DeleteTagsResponse> |
+""" |
+ |
+ |
+class TestAddTags(AWSMockServiceTestCase): |
+ connection_class = EC2Connection |
+ |
+ def default_body(self): |
+ return CREATE_TAGS_RESPONSE |
+ |
+ def test_add_tag(self): |
+ self.set_http_response(status_code=200) |
+ taggedEC2Object = TaggedEC2Object(self.service_connection) |
+ taggedEC2Object.id = "i-abcd1234" |
+ taggedEC2Object.tags["already_present_key"] = "already_present_value" |
+ |
+ taggedEC2Object.add_tag("new_key", "new_value") |
+ |
+ self.assert_request_parameters({ |
+ 'ResourceId.1': 'i-abcd1234', |
+ 'Action': 'CreateTags', |
+ 'Tag.1.Key': 'new_key', |
+ 'Tag.1.Value': 'new_value'}, |
+ ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', |
+ 'SignatureVersion', 'Timestamp', |
+ 'Version']) |
+ |
+ self.assertEqual(taggedEC2Object.tags, { |
+ "already_present_key": "already_present_value", |
+ "new_key": "new_value"}) |
+ |
+ def test_add_tags(self): |
+ self.set_http_response(status_code=200) |
+ taggedEC2Object = TaggedEC2Object(self.service_connection) |
+ taggedEC2Object.id = "i-abcd1234" |
+ taggedEC2Object.tags["already_present_key"] = "already_present_value" |
+ |
+ taggedEC2Object.add_tags({"key1": "value1", "key2": "value2"}) |
+ |
+ self.assert_request_parameters({ |
+ 'ResourceId.1': 'i-abcd1234', |
+ 'Action': 'CreateTags', |
+ 'Tag.1.Key': 'key1', |
+ 'Tag.1.Value': 'value1', |
+ 'Tag.2.Key': 'key2', |
+ 'Tag.2.Value': 'value2'}, |
+ ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', |
+ 'SignatureVersion', 'Timestamp', |
+ 'Version']) |
+ |
+ self.assertEqual(taggedEC2Object.tags, { |
+ "already_present_key": "already_present_value", |
+ "key1": "value1", |
+ "key2": "value2"}) |
+ |
+ |
+class TestRemoveTags(AWSMockServiceTestCase): |
+ connection_class = EC2Connection |
+ |
+ def default_body(self): |
+ return DELETE_TAGS_RESPONSE |
+ |
+ def test_remove_tag(self): |
+ self.set_http_response(status_code=200) |
+ taggedEC2Object = TaggedEC2Object(self.service_connection) |
+ taggedEC2Object.id = "i-abcd1234" |
+ taggedEC2Object.tags["key1"] = "value1" |
+ taggedEC2Object.tags["key2"] = "value2" |
+ |
+ taggedEC2Object.remove_tag("key1", "value1") |
+ |
+ self.assert_request_parameters({ |
+ 'ResourceId.1': 'i-abcd1234', |
+ 'Action': 'DeleteTags', |
+ 'Tag.1.Key': 'key1', |
+ 'Tag.1.Value': 'value1'}, |
+ ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', |
+ 'SignatureVersion', 'Timestamp', |
+ 'Version']) |
+ |
+ self.assertEqual(taggedEC2Object.tags, {"key2": "value2"}) |
+ |
+ def test_remove_tag_no_value(self): |
+ self.set_http_response(status_code=200) |
+ taggedEC2Object = TaggedEC2Object(self.service_connection) |
+ taggedEC2Object.id = "i-abcd1234" |
+ taggedEC2Object.tags["key1"] = "value1" |
+ taggedEC2Object.tags["key2"] = "value2" |
+ |
+ taggedEC2Object.remove_tag("key1") |
+ |
+ self.assert_request_parameters({ |
+ 'ResourceId.1': 'i-abcd1234', |
+ 'Action': 'DeleteTags', |
+ 'Tag.1.Key': 'key1'}, |
+ ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', |
+ 'SignatureVersion', 'Timestamp', |
+ 'Version']) |
+ |
+ self.assertEqual(taggedEC2Object.tags, {"key2": "value2"}) |
+ |
+ def test_remove_tag_empty_value(self): |
+ self.set_http_response(status_code=200) |
+ taggedEC2Object = TaggedEC2Object(self.service_connection) |
+ taggedEC2Object.id = "i-abcd1234" |
+ taggedEC2Object.tags["key1"] = "value1" |
+ taggedEC2Object.tags["key2"] = "value2" |
+ |
+ taggedEC2Object.remove_tag("key1", "") |
+ |
+ self.assert_request_parameters({ |
+ 'ResourceId.1': 'i-abcd1234', |
+ 'Action': 'DeleteTags', |
+ 'Tag.1.Key': 'key1', |
+ 'Tag.1.Value': ''}, |
+ ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', |
+ 'SignatureVersion', 'Timestamp', |
+ 'Version']) |
+ |
+ self.assertEqual(taggedEC2Object.tags, |
+ {"key1": "value1", "key2": "value2"}) |
+ |
+ def test_remove_tags(self): |
+ self.set_http_response(status_code=200) |
+ taggedEC2Object = TaggedEC2Object(self.service_connection) |
+ taggedEC2Object.id = "i-abcd1234" |
+ taggedEC2Object.tags["key1"] = "value1" |
+ taggedEC2Object.tags["key2"] = "value2" |
+ |
+ taggedEC2Object.remove_tags({"key1": "value1", "key2": "value2"}) |
+ |
+ self.assert_request_parameters({ |
+ 'ResourceId.1': 'i-abcd1234', |
+ 'Action': 'DeleteTags', |
+ 'Tag.1.Key': 'key1', |
+ 'Tag.1.Value': 'value1', |
+ 'Tag.2.Key': 'key2', |
+ 'Tag.2.Value': 'value2'}, |
+ ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', |
+ 'SignatureVersion', 'Timestamp', |
+ 'Version']) |
+ |
+ self.assertEqual(taggedEC2Object.tags, {}) |
+ |
+ def test_remove_tags_wrong_values(self): |
+ self.set_http_response(status_code=200) |
+ taggedEC2Object = TaggedEC2Object(self.service_connection) |
+ taggedEC2Object.id = "i-abcd1234" |
+ taggedEC2Object.tags["key1"] = "value1" |
+ taggedEC2Object.tags["key2"] = "value2" |
+ |
+ taggedEC2Object.remove_tags({"key1": "value1", "key2": "value3"}) |
+ |
+ self.assert_request_parameters({ |
+ 'ResourceId.1': 'i-abcd1234', |
+ 'Action': 'DeleteTags', |
+ 'Tag.1.Key': 'key1', |
+ 'Tag.1.Value': 'value1', |
+ 'Tag.2.Key': 'key2', |
+ 'Tag.2.Value': 'value3'}, |
+ ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', |
+ 'SignatureVersion', 'Timestamp', |
+ 'Version']) |
+ |
+ self.assertEqual(taggedEC2Object.tags, {"key2": "value2"}) |
+ |
+ def test_remove_tags_none_values(self): |
+ self.set_http_response(status_code=200) |
+ taggedEC2Object = TaggedEC2Object(self.service_connection) |
+ taggedEC2Object.id = "i-abcd1234" |
+ taggedEC2Object.tags["key1"] = "value1" |
+ taggedEC2Object.tags["key2"] = "value2" |
+ |
+ taggedEC2Object.remove_tags({"key1": "value1", "key2": None}) |
+ |
+ self.assert_request_parameters({ |
+ 'ResourceId.1': 'i-abcd1234', |
+ 'Action': 'DeleteTags', |
+ 'Tag.1.Key': 'key1', |
+ 'Tag.1.Value': 'value1', |
+ 'Tag.2.Key': 'key2'}, |
+ ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', |
+ 'SignatureVersion', 'Timestamp', |
+ 'Version']) |
+ |
+ self.assertEqual(taggedEC2Object.tags, {}) |
+ |
+ |
+if __name__ == '__main__': |
+ unittest.main() |