| Index: third_party/gsutil/third_party/boto/tests/unit/ec2/test_ec2object.py
|
| diff --git a/third_party/gsutil/third_party/boto/tests/unit/ec2/test_ec2object.py b/third_party/gsutil/third_party/boto/tests/unit/ec2/test_ec2object.py
|
| new file mode 100755
|
| index 0000000000000000000000000000000000000000..14841e91b64070107336bf171b74357429520eb0
|
| --- /dev/null
|
| +++ b/third_party/gsutil/third_party/boto/tests/unit/ec2/test_ec2object.py
|
| @@ -0,0 +1,211 @@
|
| +#!/usr/bin/env python
|
| +
|
| +from tests.unit import unittest
|
| +from tests.unit import AWSMockServiceTestCase
|
| +
|
| +from boto.ec2.connection import EC2Connection
|
| +from boto.ec2.ec2object import TaggedEC2Object
|
| +
|
| +
|
| +CREATE_TAGS_RESPONSE = br"""<?xml version="1.0" encoding="UTF-8"?>
|
| +<CreateTagsResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/">
|
| + <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>
|
| + <return>true</return>
|
| +</CreateTagsResponse>
|
| +"""
|
| +
|
| +
|
| +DELETE_TAGS_RESPONSE = br"""<?xml version="1.0" encoding="UTF-8"?>
|
| +<DeleteTagsResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/">
|
| + <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>
|
| + <return>true</return>
|
| +</DeleteTagsResponse>
|
| +"""
|
| +
|
| +
|
| +class TestAddTags(AWSMockServiceTestCase):
|
| + connection_class = EC2Connection
|
| +
|
| + def default_body(self):
|
| + return CREATE_TAGS_RESPONSE
|
| +
|
| + def test_add_tag(self):
|
| + self.set_http_response(status_code=200)
|
| + taggedEC2Object = TaggedEC2Object(self.service_connection)
|
| + taggedEC2Object.id = "i-abcd1234"
|
| + taggedEC2Object.tags["already_present_key"] = "already_present_value"
|
| +
|
| + taggedEC2Object.add_tag("new_key", "new_value")
|
| +
|
| + self.assert_request_parameters({
|
| + 'ResourceId.1': 'i-abcd1234',
|
| + 'Action': 'CreateTags',
|
| + 'Tag.1.Key': 'new_key',
|
| + 'Tag.1.Value': 'new_value'},
|
| + ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
|
| + 'SignatureVersion', 'Timestamp',
|
| + 'Version'])
|
| +
|
| + self.assertEqual(taggedEC2Object.tags, {
|
| + "already_present_key": "already_present_value",
|
| + "new_key": "new_value"})
|
| +
|
| + def test_add_tags(self):
|
| + self.set_http_response(status_code=200)
|
| + taggedEC2Object = TaggedEC2Object(self.service_connection)
|
| + taggedEC2Object.id = "i-abcd1234"
|
| + taggedEC2Object.tags["already_present_key"] = "already_present_value"
|
| +
|
| + taggedEC2Object.add_tags({"key1": "value1", "key2": "value2"})
|
| +
|
| + self.assert_request_parameters({
|
| + 'ResourceId.1': 'i-abcd1234',
|
| + 'Action': 'CreateTags',
|
| + 'Tag.1.Key': 'key1',
|
| + 'Tag.1.Value': 'value1',
|
| + 'Tag.2.Key': 'key2',
|
| + 'Tag.2.Value': 'value2'},
|
| + ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
|
| + 'SignatureVersion', 'Timestamp',
|
| + 'Version'])
|
| +
|
| + self.assertEqual(taggedEC2Object.tags, {
|
| + "already_present_key": "already_present_value",
|
| + "key1": "value1",
|
| + "key2": "value2"})
|
| +
|
| +
|
| +class TestRemoveTags(AWSMockServiceTestCase):
|
| + connection_class = EC2Connection
|
| +
|
| + def default_body(self):
|
| + return DELETE_TAGS_RESPONSE
|
| +
|
| + def test_remove_tag(self):
|
| + self.set_http_response(status_code=200)
|
| + taggedEC2Object = TaggedEC2Object(self.service_connection)
|
| + taggedEC2Object.id = "i-abcd1234"
|
| + taggedEC2Object.tags["key1"] = "value1"
|
| + taggedEC2Object.tags["key2"] = "value2"
|
| +
|
| + taggedEC2Object.remove_tag("key1", "value1")
|
| +
|
| + self.assert_request_parameters({
|
| + 'ResourceId.1': 'i-abcd1234',
|
| + 'Action': 'DeleteTags',
|
| + 'Tag.1.Key': 'key1',
|
| + 'Tag.1.Value': 'value1'},
|
| + ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
|
| + 'SignatureVersion', 'Timestamp',
|
| + 'Version'])
|
| +
|
| + self.assertEqual(taggedEC2Object.tags, {"key2": "value2"})
|
| +
|
| + def test_remove_tag_no_value(self):
|
| + self.set_http_response(status_code=200)
|
| + taggedEC2Object = TaggedEC2Object(self.service_connection)
|
| + taggedEC2Object.id = "i-abcd1234"
|
| + taggedEC2Object.tags["key1"] = "value1"
|
| + taggedEC2Object.tags["key2"] = "value2"
|
| +
|
| + taggedEC2Object.remove_tag("key1")
|
| +
|
| + self.assert_request_parameters({
|
| + 'ResourceId.1': 'i-abcd1234',
|
| + 'Action': 'DeleteTags',
|
| + 'Tag.1.Key': 'key1'},
|
| + ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
|
| + 'SignatureVersion', 'Timestamp',
|
| + 'Version'])
|
| +
|
| + self.assertEqual(taggedEC2Object.tags, {"key2": "value2"})
|
| +
|
| + def test_remove_tag_empty_value(self):
|
| + self.set_http_response(status_code=200)
|
| + taggedEC2Object = TaggedEC2Object(self.service_connection)
|
| + taggedEC2Object.id = "i-abcd1234"
|
| + taggedEC2Object.tags["key1"] = "value1"
|
| + taggedEC2Object.tags["key2"] = "value2"
|
| +
|
| + taggedEC2Object.remove_tag("key1", "")
|
| +
|
| + self.assert_request_parameters({
|
| + 'ResourceId.1': 'i-abcd1234',
|
| + 'Action': 'DeleteTags',
|
| + 'Tag.1.Key': 'key1',
|
| + 'Tag.1.Value': ''},
|
| + ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
|
| + 'SignatureVersion', 'Timestamp',
|
| + 'Version'])
|
| +
|
| + self.assertEqual(taggedEC2Object.tags,
|
| + {"key1": "value1", "key2": "value2"})
|
| +
|
| + def test_remove_tags(self):
|
| + self.set_http_response(status_code=200)
|
| + taggedEC2Object = TaggedEC2Object(self.service_connection)
|
| + taggedEC2Object.id = "i-abcd1234"
|
| + taggedEC2Object.tags["key1"] = "value1"
|
| + taggedEC2Object.tags["key2"] = "value2"
|
| +
|
| + taggedEC2Object.remove_tags({"key1": "value1", "key2": "value2"})
|
| +
|
| + self.assert_request_parameters({
|
| + 'ResourceId.1': 'i-abcd1234',
|
| + 'Action': 'DeleteTags',
|
| + 'Tag.1.Key': 'key1',
|
| + 'Tag.1.Value': 'value1',
|
| + 'Tag.2.Key': 'key2',
|
| + 'Tag.2.Value': 'value2'},
|
| + ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
|
| + 'SignatureVersion', 'Timestamp',
|
| + 'Version'])
|
| +
|
| + self.assertEqual(taggedEC2Object.tags, {})
|
| +
|
| + def test_remove_tags_wrong_values(self):
|
| + self.set_http_response(status_code=200)
|
| + taggedEC2Object = TaggedEC2Object(self.service_connection)
|
| + taggedEC2Object.id = "i-abcd1234"
|
| + taggedEC2Object.tags["key1"] = "value1"
|
| + taggedEC2Object.tags["key2"] = "value2"
|
| +
|
| + taggedEC2Object.remove_tags({"key1": "value1", "key2": "value3"})
|
| +
|
| + self.assert_request_parameters({
|
| + 'ResourceId.1': 'i-abcd1234',
|
| + 'Action': 'DeleteTags',
|
| + 'Tag.1.Key': 'key1',
|
| + 'Tag.1.Value': 'value1',
|
| + 'Tag.2.Key': 'key2',
|
| + 'Tag.2.Value': 'value3'},
|
| + ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
|
| + 'SignatureVersion', 'Timestamp',
|
| + 'Version'])
|
| +
|
| + self.assertEqual(taggedEC2Object.tags, {"key2": "value2"})
|
| +
|
| + def test_remove_tags_none_values(self):
|
| + self.set_http_response(status_code=200)
|
| + taggedEC2Object = TaggedEC2Object(self.service_connection)
|
| + taggedEC2Object.id = "i-abcd1234"
|
| + taggedEC2Object.tags["key1"] = "value1"
|
| + taggedEC2Object.tags["key2"] = "value2"
|
| +
|
| + taggedEC2Object.remove_tags({"key1": "value1", "key2": None})
|
| +
|
| + self.assert_request_parameters({
|
| + 'ResourceId.1': 'i-abcd1234',
|
| + 'Action': 'DeleteTags',
|
| + 'Tag.1.Key': 'key1',
|
| + 'Tag.1.Value': 'value1',
|
| + 'Tag.2.Key': 'key2'},
|
| + ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
|
| + 'SignatureVersion', 'Timestamp',
|
| + 'Version'])
|
| +
|
| + self.assertEqual(taggedEC2Object.tags, {})
|
| +
|
| +
|
| +if __name__ == '__main__':
|
| + unittest.main()
|
|
|