OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python |
| 2 # Copyright (c) 2012 Amazon.com, Inc. or its affiliates. All Rights Reserved |
| 3 # |
| 4 # Permission is hereby granted, free of charge, to any person obtaining a |
| 5 # copy of this software and associated documentation files (the |
| 6 # "Software"), to deal in the Software without restriction, including |
| 7 # without limitation the rights to use, copy, modify, merge, publish, dis- |
| 8 # tribute, sublicense, and/or sell copies of the Software, and to permit |
| 9 # persons to whom the Software is furnished to do so, subject to the fol- |
| 10 # lowing conditions: |
| 11 # |
| 12 # The above copyright notice and this permission notice shall be included |
| 13 # in all copies or substantial portions of the Software. |
| 14 # |
| 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
| 16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
| 17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
| 18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
| 19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| 21 # IN THE SOFTWARE. |
| 22 # |
| 23 import time |
| 24 from tests.unit import unittest |
| 25 |
| 26 from boto.datapipeline import layer1 |
| 27 |
| 28 |
| 29 class TestDataPipeline(unittest.TestCase): |
| 30 datapipeline = True |
| 31 |
| 32 def setUp(self): |
| 33 self.connection = layer1.DataPipelineConnection() |
| 34 self.sample_pipeline_objects = [ |
| 35 {'fields': [ |
| 36 {'key': 'workerGroup', 'stringValue': 'MyworkerGroup'}], |
| 37 'id': 'Default', |
| 38 'name': 'Default'}, |
| 39 {'fields': [ |
| 40 {'key': 'startDateTime', 'stringValue': '2012-09-25T17:00:00'}, |
| 41 {'key': 'type', 'stringValue': 'Schedule'}, |
| 42 {'key': 'period', 'stringValue': '1 hour'}, |
| 43 {'key': 'endDateTime', 'stringValue': '2012-09-25T18:00:00'}], |
| 44 'id': 'Schedule', |
| 45 'name': 'Schedule'}, |
| 46 {'fields': [ |
| 47 {'key': 'type', 'stringValue': 'ShellCommandActivity'}, |
| 48 {'key': 'command', 'stringValue': 'echo hello'}, |
| 49 {'key': 'parent', 'refValue': 'Default'}, |
| 50 {'key': 'schedule', 'refValue': 'Schedule'}], |
| 51 'id': 'SayHello', |
| 52 'name': 'SayHello'} |
| 53 ] |
| 54 self.connection.auth_service_name = 'datapipeline' |
| 55 |
| 56 def create_pipeline(self, name, unique_id, description=None): |
| 57 response = self.connection.create_pipeline(name, unique_id, |
| 58 description) |
| 59 pipeline_id = response['pipelineId'] |
| 60 self.addCleanup(self.connection.delete_pipeline, pipeline_id) |
| 61 return pipeline_id |
| 62 |
| 63 def get_pipeline_state(self, pipeline_id): |
| 64 response = self.connection.describe_pipelines([pipeline_id]) |
| 65 for attr in response['pipelineDescriptionList'][0]['fields']: |
| 66 if attr['key'] == '@pipelineState': |
| 67 return attr['stringValue'] |
| 68 |
| 69 def test_can_create_and_delete_a_pipeline(self): |
| 70 response = self.connection.create_pipeline('name', 'unique_id', |
| 71 'description') |
| 72 self.connection.delete_pipeline(response['pipelineId']) |
| 73 |
| 74 def test_validate_pipeline(self): |
| 75 pipeline_id = self.create_pipeline('name2', 'unique_id2') |
| 76 |
| 77 self.connection.validate_pipeline_definition( |
| 78 self.sample_pipeline_objects, pipeline_id) |
| 79 |
| 80 def test_put_pipeline_definition(self): |
| 81 pipeline_id = self.create_pipeline('name3', 'unique_id3') |
| 82 self.connection.put_pipeline_definition(self.sample_pipeline_objects, |
| 83 pipeline_id) |
| 84 |
| 85 # We should now be able to get the pipeline definition and see |
| 86 # that it matches what we put. |
| 87 response = self.connection.get_pipeline_definition(pipeline_id) |
| 88 objects = response['pipelineObjects'] |
| 89 self.assertEqual(len(objects), 3) |
| 90 self.assertEqual(objects[0]['id'], 'Default') |
| 91 self.assertEqual(objects[0]['name'], 'Default') |
| 92 self.assertEqual(objects[0]['fields'], |
| 93 [{'key': 'workerGroup', 'stringValue': 'MyworkerGroup'}
]) |
| 94 |
| 95 def test_activate_pipeline(self): |
| 96 pipeline_id = self.create_pipeline('name4', 'unique_id4') |
| 97 self.connection.put_pipeline_definition(self.sample_pipeline_objects, |
| 98 pipeline_id) |
| 99 self.connection.activate_pipeline(pipeline_id) |
| 100 |
| 101 attempts = 0 |
| 102 state = self.get_pipeline_state(pipeline_id) |
| 103 while state != 'SCHEDULED' and attempts < 10: |
| 104 time.sleep(10) |
| 105 attempts += 1 |
| 106 state = self.get_pipeline_state(pipeline_id) |
| 107 if attempts > 10: |
| 108 self.fail("Pipeline did not become scheduled " |
| 109 "after 10 attempts.") |
| 110 objects = self.connection.describe_objects(['Default'], pipeline_id) |
| 111 field = objects['pipelineObjects'][0]['fields'][0] |
| 112 self.assertDictEqual(field, {'stringValue': 'COMPONENT', 'key': '@sphere
'}) |
| 113 |
| 114 def test_list_pipelines(self): |
| 115 pipeline_id = self.create_pipeline('name5', 'unique_id5') |
| 116 pipeline_id_list = [p['id'] for p in |
| 117 self.connection.list_pipelines()['pipelineIdList']] |
| 118 self.assertTrue(pipeline_id in pipeline_id_list) |
| 119 |
| 120 |
| 121 if __name__ == '__main__': |
| 122 unittest.main() |
OLD | NEW |